package reactor.core.publisher;

import java.util.Arrays;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.Exceptions;
import reactor.core.MultiProducer;
import reactor.core.Producer;
import reactor.core.Receiver;
import reactor.core.Trackable;
import reactor.core.publisher.EventLoopProcessor;
import reactor.core.publisher.RingBuffer;
import reactor.util.concurrent.QueueSupplier;

/* loaded from: input_file:reactor/core/publisher/EmitterProcessor.class */
public final class EmitterProcessor<T> extends FluxProcessor<T, T> implements MultiProducer, Receiver {
    final int maxConcurrency;
    final int bufferSize;
    final int limit;
    final boolean autoCancel;
    Subscription upstreamSubscription;
    private volatile RingBuffer<EventLoopProcessor.Slot<T>> emitBuffer;
    private volatile boolean done;
    private volatile Throwable error;
    volatile EmitterSubscriber<?>[] subscribers;
    private volatile int running;
    private volatile int outstanding;
    boolean firstDrain = true;
    static final EmitterSubscriber<?>[] EMPTY = new EmitterSubscriber[0];
    static final EmitterSubscriber<?>[] CANCELLED = new EmitterSubscriber[0];
    static final AtomicReferenceFieldUpdater<EmitterProcessor, Throwable> ERROR = AtomicReferenceFieldUpdater.newUpdater(EmitterProcessor.class, Throwable.class, "error");
    static final AtomicReferenceFieldUpdater<EmitterProcessor, EmitterSubscriber[]> SUBSCRIBERS = AtomicReferenceFieldUpdater.newUpdater(EmitterProcessor.class, EmitterSubscriber[].class, "subscribers");
    static final AtomicIntegerFieldUpdater<EmitterProcessor> RUNNING = AtomicIntegerFieldUpdater.newUpdater(EmitterProcessor.class, "running");
    static final AtomicIntegerFieldUpdater<EmitterProcessor> OUTSTANDING = AtomicIntegerFieldUpdater.newUpdater(EmitterProcessor.class, "outstanding");

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:reactor/core/publisher/EmitterProcessor$EmitterSubscriber.class */
    public static final class EmitterSubscriber<T> implements Subscription, Trackable, Receiver, Producer {
        public static final long MASK_NOT_SUBSCRIBED = Long.MIN_VALUE;
        final EmitterProcessor<T> parent;
        final Subscriber<? super T> actual;
        volatile boolean done;
        boolean unbounded = false;
        private volatile long requested = Long.MIN_VALUE;
        volatile RingBuffer.Sequence pollCursor;
        static final AtomicLongFieldUpdater<EmitterSubscriber> REQUESTED = AtomicLongFieldUpdater.newUpdater(EmitterSubscriber.class, "requested");
        static final AtomicReferenceFieldUpdater<EmitterSubscriber, RingBuffer.Sequence> CURSOR = AtomicReferenceFieldUpdater.newUpdater(EmitterSubscriber.class, RingBuffer.Sequence.class, "pollCursor");

        public EmitterSubscriber(EmitterProcessor<T> emitterProcessor, Subscriber<? super T> subscriber) {
            this.actual = subscriber;
            this.parent = emitterProcessor;
        }

        public void request(long j) {
            if (Operators.checkRequest(j, this.actual)) {
                Operators.getAndAddCap(REQUESTED, this, j);
                if (EmitterProcessor.RUNNING.getAndIncrement(this.parent) == 0) {
                    this.parent.drainLoop();
                }
            }
        }

        public void cancel() {
            this.done = true;
            this.parent.drain();
        }

        @Override // reactor.core.Trackable
        public long requestedFromDownstream() {
            return this.requested;
        }

        @Override // reactor.core.Trackable
        public long getCapacity() {
            return this.parent.bufferSize;
        }

        void startTracking(long j) {
            RingBuffer.Sequence newSequence = RingBuffer.newSequence(j - 1);
            if (CURSOR.compareAndSet(this, null, newSequence)) {
                ((EmitterProcessor) this.parent).emitBuffer.addGatingSequence(newSequence);
            }
        }

        void start() {
            if (REQUESTED.compareAndSet(this, Long.MIN_VALUE, 0L)) {
                RingBuffer ringBuffer = ((EmitterProcessor) this.parent).emitBuffer;
                if (ringBuffer != null) {
                    startTracking(Math.max(0L, ringBuffer.getMinimumGatingSequence() + 1));
                }
                this.actual.onSubscribe(this);
            }
        }

        @Override // reactor.core.Trackable
        public boolean isCancelled() {
            return this.done;
        }

        @Override // reactor.core.Trackable
        public long getPending() {
            if (this.pollCursor == null || this.done) {
                return -1L;
            }
            return ((EmitterProcessor) this.parent).emitBuffer.getCursor() - this.pollCursor.getAsLong();
        }

        @Override // reactor.core.Trackable
        public boolean isStarted() {
            return this.parent.isStarted();
        }

        @Override // reactor.core.Trackable
        public boolean isTerminated() {
            return this.parent.isTerminated();
        }

        @Override // reactor.core.Receiver
        public Object upstream() {
            return this.parent;
        }

        @Override // reactor.core.Producer
        public Subscriber<? super T> downstream() {
            return this.actual;
        }
    }

    public static <E> EmitterProcessor<E> create() {
        return create(true);
    }

    public static <E> EmitterProcessor<E> create(boolean z) {
        return create(QueueSupplier.SMALL_BUFFER_SIZE, z);
    }

    public static <E> EmitterProcessor<E> create(int i) {
        return create(i, Integer.MAX_VALUE);
    }

    public static <E> EmitterProcessor<E> create(int i, int i2) {
        return create(i, i2, true);
    }

    public static <E> EmitterProcessor<E> create(int i, boolean z) {
        return create(i, Integer.MAX_VALUE, z);
    }

    public static <E> EmitterProcessor<E> create(int i, int i2, boolean z) {
        return new EmitterProcessor<>(z, i2, i);
    }

    @Override // reactor.core.Receiver
    public Subscription upstream() {
        return this.upstreamSubscription;
    }

    EmitterProcessor(boolean z, int i, int i2) {
        this.autoCancel = z;
        this.maxConcurrency = i;
        this.bufferSize = i2;
        this.limit = Math.max(1, i2 / 2);
        OUTSTANDING.lazySet(this, i2);
        SUBSCRIBERS.lazySet(this, EMPTY);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // reactor.core.publisher.FluxProcessor
    public void subscribe(Subscriber<? super T> subscriber) {
        super.subscribe(subscriber);
        EmitterSubscriber<T> emitterSubscriber = new EmitterSubscriber<>(this, subscriber);
        try {
            addInner(emitterSubscriber);
            if (this.upstreamSubscription != null) {
                emitterSubscriber.start();
            }
        } catch (Throwable th) {
            if (Exceptions.isCancel(th)) {
                return;
            }
            removeInner(emitterSubscriber, EMPTY);
            Operators.error(subscriber, th);
        }
    }

    @Override // reactor.core.publisher.FluxProcessor
    public EmitterProcessor<T> connect() {
        onSubscribe(Operators.emptySubscription());
        return this;
    }

    @Override // reactor.core.Trackable
    public long getPending() {
        if (this.emitBuffer == null) {
            return -1L;
        }
        return this.emitBuffer.getPending();
    }

    public void onNext(T t) {
        if (t == null) {
            throw Exceptions.argumentIsNullException();
        }
        EmitterSubscriber<?>[] emitterSubscriberArr = this.subscribers;
        if (this.autoCancel && emitterSubscriberArr == CANCELLED) {
            return;
        }
        int length = emitterSubscriberArr.length;
        if (length == 0) {
            buffer((EmitterProcessor<T>) t);
            return;
        }
        long j = -1;
        if (this.upstreamSubscription != Operators.emptySubscription()) {
            if (this.outstanding == 0) {
                buffer((EmitterProcessor<T>) t);
                drain();
                return;
            }
            OUTSTANDING.decrementAndGet(this);
        }
        for (int i = 0; i < length; i++) {
            EmitterSubscriber<?> emitterSubscriber = emitterSubscriberArr[i];
            if (emitterSubscriber.done) {
                removeInner(emitterSubscriber, this.autoCancel ? CANCELLED : EMPTY);
                if (this.autoCancel && this.subscribers == CANCELLED) {
                    if (RUNNING.compareAndSet(this, 0, 1)) {
                        cancel();
                        return;
                    }
                    return;
                }
            } else {
                long j2 = ((EmitterSubscriber) emitterSubscriber).requested;
                emitterSubscriber.unbounded = j2 == Long.MAX_VALUE;
                RingBuffer.Sequence sequence = emitterSubscriber.unbounded ? null : emitterSubscriber.pollCursor;
                if (j2 > 0 && sequence == null) {
                    if (j2 != Long.MAX_VALUE) {
                        EmitterSubscriber.REQUESTED.decrementAndGet(emitterSubscriber);
                    }
                    emitterSubscriber.actual.onNext(t);
                } else if (j == -1) {
                    j = buffer((EmitterProcessor<T>) t);
                    startAllTrackers(emitterSubscriberArr, j, i + 1);
                } else if (sequence == null) {
                    emitterSubscriber.startTracking(j);
                }
            }
        }
        if (RUNNING.getAndIncrement(this) != 0) {
            return;
        }
        drainLoop();
    }

    public void onError(Throwable th) {
        if (th == null) {
            throw Exceptions.argumentIsNullException();
        }
        if (this.autoCancel && this.done) {
            Operators.onErrorDropped(th);
        }
        reportError(th);
        this.done = true;
        drain();
    }

    public void onComplete() {
        if (this.done) {
            return;
        }
        this.done = true;
        drain();
    }

    public void onSubscribe(Subscription subscription) {
        if (Operators.validate(this.upstreamSubscription, subscription)) {
            this.upstreamSubscription = subscription;
            try {
                EmitterSubscriber<?>[] emitterSubscriberArr = this.subscribers;
                if (emitterSubscriberArr != CANCELLED && emitterSubscriberArr.length != 0) {
                    for (EmitterSubscriber<?> emitterSubscriber : emitterSubscriberArr) {
                        emitterSubscriber.start();
                    }
                }
            } catch (Throwable th) {
                onError(Operators.onOperatorError(subscription, th));
            }
        }
    }

    @Override // reactor.core.Trackable
    public Throwable getError() {
        return this.error;
    }

    @Override // reactor.core.Trackable
    public boolean isCancelled() {
        return this.autoCancel && this.subscribers == CANCELLED;
    }

    @Override // reactor.core.publisher.FluxProcessor, reactor.core.Trackable
    public final long getCapacity() {
        return this.bufferSize;
    }

    @Override // reactor.core.Trackable
    public boolean isStarted() {
        return this.upstreamSubscription != null;
    }

    @Override // reactor.core.Trackable
    public boolean isTerminated() {
        return this.done && (this.emitBuffer == null || this.emitBuffer.getPending() == 0);
    }

    @Override // reactor.core.Trackable
    public long limit() {
        return this.limit;
    }

    @Override // reactor.core.Trackable
    public long expectedFromUpstream() {
        return this.outstanding;
    }

    RingBuffer<EventLoopProcessor.Slot<T>> getMainQueue() {
        RingBuffer<EventLoopProcessor.Slot<T>> ringBuffer = this.emitBuffer;
        if (ringBuffer == null) {
            ringBuffer = EventLoopProcessor.createSingleProducer(this.bufferSize);
            this.emitBuffer = ringBuffer;
        }
        return ringBuffer;
    }

    final long buffer(T t) {
        RingBuffer<EventLoopProcessor.Slot<T>> mainQueue = getMainQueue();
        long next = mainQueue.next();
        mainQueue.get(next).value = t;
        mainQueue.publish(next);
        return next;
    }

    final void drain() {
        if (RUNNING.getAndIncrement(this) == 0) {
            drainLoop();
        }
    }

    final void drainLoop() {
        long j;
        int i = 1;
        RingBuffer<EventLoopProcessor.Slot<T>> ringBuffer = null;
        do {
            EmitterSubscriber<?>[] emitterSubscriberArr = this.subscribers;
            if (emitterSubscriberArr == CANCELLED) {
                cancel();
                return;
            }
            boolean z = this.done;
            if (z && emitterSubscriberArr == EMPTY) {
                return;
            }
            if (emitterSubscriberArr.length != 0) {
                for (EmitterSubscriber<?> emitterSubscriber : emitterSubscriberArr) {
                    long j2 = ((EmitterSubscriber) emitterSubscriber).requested;
                    if (emitterSubscriber.done) {
                        removeInner(emitterSubscriber, this.autoCancel ? CANCELLED : EMPTY);
                        if (this.autoCancel && this.subscribers == CANCELLED) {
                            cancel();
                            return;
                        }
                    } else {
                        if (ringBuffer == null) {
                            ringBuffer = this.emitBuffer;
                        }
                        RingBuffer.Sequence sequence = emitterSubscriber.pollCursor;
                        if (emitterSubscriber.unbounded || sequence == null || j2 <= 0) {
                            j = 0;
                        } else {
                            j = j2;
                            boolean z2 = j == Long.MAX_VALUE;
                            long asLong = sequence.getAsLong();
                            long cursor = ringBuffer.getCursor();
                            while (j != 0) {
                                asLong++;
                                if (cursor < asLong) {
                                    break;
                                }
                                T t = ringBuffer.get(asLong).value;
                                sequence.set(asLong);
                                emitterSubscriber.actual.onNext(t);
                                if (!z2) {
                                    j--;
                                }
                            }
                            if (!z2 && j2 > j) {
                                EmitterSubscriber.REQUESTED.addAndGet(emitterSubscriber, j - j2);
                            }
                        }
                        if (z) {
                            checkTerminal(emitterSubscriber, sequence, j);
                        }
                    }
                }
                if (!this.done && this.firstDrain) {
                    Subscription subscription = this.upstreamSubscription;
                    if (subscription != null) {
                        this.firstDrain = false;
                        subscription.request(this.bufferSize);
                    }
                } else if (ringBuffer != null) {
                    requestMore(ringBuffer.getPending());
                } else {
                    requestMore(0);
                }
            }
            i = RUNNING.addAndGet(this, -i);
        } while (i != 0);
    }

    final void checkTerminal(EmitterSubscriber<T> emitterSubscriber, RingBuffer.Sequence sequence, long j) {
        Throwable th = this.error;
        if ((th == null || j != 0) && sequence != null && !emitterSubscriber.unbounded && sequence.getAsLong() < this.emitBuffer.getCursor()) {
            return;
        }
        removeInner(emitterSubscriber, EMPTY);
        if (emitterSubscriber.done) {
            return;
        }
        if (th == null) {
            emitterSubscriber.actual.onComplete();
        } else {
            emitterSubscriber.actual.onError(th);
        }
    }

    final void startAllTrackers(EmitterSubscriber<?>[] emitterSubscriberArr, long j, int i) {
        for (int i2 = 0; i2 < i - 1; i2++) {
            if (emitterSubscriberArr[i2].pollCursor == null) {
                emitterSubscriberArr[i2].startTracking(j + 1);
            }
        }
        emitterSubscriberArr[i - 1].startTracking(j);
    }

    final void reportError(Throwable th) {
        ERROR.compareAndSet(this, null, th);
    }

    final void addInner(EmitterSubscriber<T> emitterSubscriber) {
        EmitterSubscriber<?>[] emitterSubscriberArr;
        EmitterSubscriber[] emitterSubscriberArr2;
        do {
            emitterSubscriberArr = this.subscribers;
            if (emitterSubscriberArr == CANCELLED) {
                Flux.empty().subscribe(emitterSubscriber.actual);
            }
            int length = emitterSubscriberArr.length;
            if (length + 1 > this.maxConcurrency) {
                throw Exceptions.failWithOverflow();
            }
            emitterSubscriberArr2 = new EmitterSubscriber[length + 1];
            System.arraycopy(emitterSubscriberArr, 0, emitterSubscriberArr2, 0, length);
            emitterSubscriberArr2[length] = emitterSubscriber;
        } while (!SUBSCRIBERS.compareAndSet(this, emitterSubscriberArr, emitterSubscriberArr2));
    }

    final void removeInner(EmitterSubscriber<?> emitterSubscriber, EmitterSubscriber<?>[] emitterSubscriberArr) {
        EmitterSubscriber<?>[] emitterSubscriberArr2;
        EmitterSubscriber<?>[] emitterSubscriberArr3;
        do {
            emitterSubscriberArr2 = this.subscribers;
            if (emitterSubscriberArr2 == CANCELLED || emitterSubscriberArr2 == EMPTY) {
                return;
            }
            int length = emitterSubscriberArr2.length;
            int i = -1;
            int i2 = 0;
            while (true) {
                if (i2 >= length) {
                    break;
                }
                if (emitterSubscriberArr2[i2] == emitterSubscriber) {
                    i = i2;
                    break;
                }
                i2++;
            }
            if (i < 0) {
                return;
            }
            if (length == 1) {
                emitterSubscriberArr3 = emitterSubscriberArr;
            } else {
                emitterSubscriberArr3 = new EmitterSubscriber[length - 1];
                System.arraycopy(emitterSubscriberArr2, 0, emitterSubscriberArr3, 0, i);
                System.arraycopy(emitterSubscriberArr2, i + 1, emitterSubscriberArr3, i, (length - i) - 1);
            }
        } while (!SUBSCRIBERS.compareAndSet(this, emitterSubscriberArr2, emitterSubscriberArr3));
        RingBuffer.Sequence sequence = emitterSubscriber.pollCursor;
        if (sequence != null) {
            getMainQueue().removeGatingSequence(sequence);
        }
    }

    final void requestMore(int i) {
        int i2;
        int i3;
        Subscription subscription = this.upstreamSubscription;
        if (subscription != Operators.emptySubscription() && i < this.bufferSize && (i2 = this.outstanding) <= this.limit && (i3 = (this.bufferSize - i2) - i) > 0 && subscription != null) {
            OUTSTANDING.addAndGet(this, i3);
            subscription.request(i3);
        }
    }

    final void cancel() {
        Subscription subscription;
        if (this.done || (subscription = this.upstreamSubscription) == null) {
            return;
        }
        this.upstreamSubscription = null;
        subscription.cancel();
    }

    @Override // reactor.core.MultiProducer
    public Iterator<?> downstreams() {
        return Arrays.asList(this.subscribers).iterator();
    }

    @Override // reactor.core.MultiProducer
    public long downstreamCount() {
        return this.subscribers.length;
    }

    @Override // reactor.core.publisher.Flux
    public String toString() {
        return "{done: " + this.done + (this.error != null ? ", error: '" + this.error.getMessage() + "', " : "") + ", outstanding: " + this.outstanding + ", pending: " + this.emitBuffer + '}';
    }
}
