package reactor.core.publisher;

import java.util.HashSet;
import java.util.Iterator;
import java.util.Objects;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Function;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.Disposable;
import reactor.core.Exceptions;
import reactor.core.publisher.Operators;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:reactor/core/publisher/FluxWindowStartEnd.class */
public final class FluxWindowStartEnd<T, U, V> extends FluxSource<T, Flux<T>> {
    final Publisher<U> start;
    final Function<? super U, ? extends Publisher<V>> end;
    final Supplier<? extends Queue<Object>> drainQueueSupplier;
    final Supplier<? extends Queue<T>> processorQueueSupplier;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:reactor/core/publisher/FluxWindowStartEnd$NewWindow.class */
    public static final class NewWindow<U> {
        final U value;

        public NewWindow(U u) {
            this.value = u;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:reactor/core/publisher/FluxWindowStartEnd$WindowStartEndEnder.class */
    public static final class WindowStartEndEnder<T, V> extends Operators.DeferredSubscription implements Subscriber<V> {
        final WindowStartEndMainSubscriber<T, ?, V> main;
        final UnicastProcessor<T> window;

        public WindowStartEndEnder(WindowStartEndMainSubscriber<T, ?, V> windowStartEndMainSubscriber, UnicastProcessor<T> unicastProcessor) {
            this.main = windowStartEndMainSubscriber;
            this.window = unicastProcessor;
        }

        public void onSubscribe(Subscription subscription) {
            if (set(subscription)) {
                subscription.request(Long.MAX_VALUE);
            }
        }

        public void onNext(V v) {
            cancel();
            this.main.endSignal(this);
        }

        public void onError(Throwable th) {
            this.main.endError(th);
        }

        public void onComplete() {
            this.main.endSignal(this);
        }
    }

    /* loaded from: input_file:reactor/core/publisher/FluxWindowStartEnd$WindowStartEndMainSubscriber.class */
    static final class WindowStartEndMainSubscriber<T, U, V> implements Subscriber<T>, Subscription, Disposable {
        final Subscriber<? super Flux<T>> actual;
        final Queue<Object> queue;
        final Function<? super U, ? extends Publisher<V>> end;
        final Supplier<? extends Queue<T>> processorQueueSupplier;
        volatile long requested;
        volatile int wip;
        volatile boolean cancelled;
        volatile Subscription s;
        volatile int once;
        volatile boolean mainDone;
        volatile Throwable error;
        static final AtomicLongFieldUpdater<WindowStartEndMainSubscriber> REQUESTED = AtomicLongFieldUpdater.newUpdater(WindowStartEndMainSubscriber.class, "requested");
        static final AtomicIntegerFieldUpdater<WindowStartEndMainSubscriber> WIP = AtomicIntegerFieldUpdater.newUpdater(WindowStartEndMainSubscriber.class, "wip");
        static final AtomicReferenceFieldUpdater<WindowStartEndMainSubscriber, Subscription> S = AtomicReferenceFieldUpdater.newUpdater(WindowStartEndMainSubscriber.class, Subscription.class, "s");
        static final AtomicIntegerFieldUpdater<WindowStartEndMainSubscriber> ONCE = AtomicIntegerFieldUpdater.newUpdater(WindowStartEndMainSubscriber.class, "once");
        static final AtomicIntegerFieldUpdater<WindowStartEndMainSubscriber> OPEN = AtomicIntegerFieldUpdater.newUpdater(WindowStartEndMainSubscriber.class, "open");
        static final AtomicReferenceFieldUpdater<WindowStartEndMainSubscriber, Throwable> ERROR = AtomicReferenceFieldUpdater.newUpdater(WindowStartEndMainSubscriber.class, Throwable.class, "error");
        final WindowStartEndStarter<T, U, V> starter = new WindowStartEndStarter<>(this);
        Set<WindowStartEndEnder<T, V>> windowEnds = new HashSet();
        Set<UnicastProcessor<T>> windows = new HashSet();
        volatile int open = 1;

        WindowStartEndMainSubscriber(Subscriber<? super Flux<T>> subscriber, Queue<Object> queue, Function<? super U, ? extends Publisher<V>> function, Supplier<? extends Queue<T>> supplier) {
            this.actual = subscriber;
            this.queue = queue;
            this.end = function;
            this.processorQueueSupplier = supplier;
        }

        public void onSubscribe(Subscription subscription) {
            if (Operators.setOnce(S, this, subscription)) {
                subscription.request(Long.MAX_VALUE);
            }
        }

        public void onNext(T t) {
            synchronized (this) {
                this.queue.offer(t);
            }
            drain();
        }

        public void onError(Throwable th) {
            if (Exceptions.addThrowable(ERROR, this, th)) {
                drain();
            } else {
                Operators.onErrorDropped(th);
            }
        }

        public void onComplete() {
            closeMain();
            this.starter.cancel();
            this.mainDone = true;
            drain();
        }

        public void request(long j) {
            if (Operators.validate(j)) {
                Operators.getAndAddCap(REQUESTED, this, j);
            }
        }

        public void cancel() {
            this.cancelled = true;
            this.starter.cancel();
            closeMain();
        }

        void starterNext(U u) {
            NewWindow newWindow = new NewWindow(u);
            synchronized (this) {
                this.queue.offer(newWindow);
            }
            drain();
        }

        void starterError(Throwable th) {
            if (Exceptions.addThrowable(ERROR, this, th)) {
                drain();
            } else {
                Operators.onErrorDropped(th);
            }
        }

        void starterComplete() {
            closeMain();
            drain();
        }

        void endSignal(WindowStartEndEnder<T, V> windowStartEndEnder) {
            remove(windowStartEndEnder);
            synchronized (this) {
                this.queue.offer(windowStartEndEnder);
            }
            drain();
        }

        void endError(Throwable th) {
            if (Exceptions.addThrowable(ERROR, this, th)) {
                drain();
            } else {
                Operators.onErrorDropped(th);
            }
        }

        void closeMain() {
            if (ONCE.compareAndSet(this, 0, 1)) {
                dispose();
            }
        }

        @Override // reactor.core.Disposable, reactor.core.Cancellation
        public void dispose() {
            if (OPEN.decrementAndGet(this) == 0) {
                Operators.terminate(S, this);
            }
        }

        boolean add(WindowStartEndEnder<T, V> windowStartEndEnder) {
            synchronized (this.starter) {
                Set<WindowStartEndEnder<T, V>> set = this.windowEnds;
                if (set != null) {
                    set.add(windowStartEndEnder);
                    return true;
                }
                windowStartEndEnder.cancel();
                return false;
            }
        }

        void remove(WindowStartEndEnder<T, V> windowStartEndEnder) {
            synchronized (this.starter) {
                Set<WindowStartEndEnder<T, V>> set = this.windowEnds;
                if (set != null) {
                    set.remove(windowStartEndEnder);
                }
            }
        }

        void removeAll() {
            synchronized (this.starter) {
                Set<WindowStartEndEnder<T, V>> set = this.windowEnds;
                if (set == null) {
                    return;
                }
                this.windowEnds = null;
                Iterator<WindowStartEndEnder<T, V>> it = set.iterator();
                while (it.hasNext()) {
                    it.next().cancel();
                }
            }
        }

        /* JADX WARN: Multi-variable type inference failed */
        void drain() {
            if (WIP.getAndIncrement(this) != 0) {
                return;
            }
            Subscriber<? super Flux<T>> subscriber = this.actual;
            Queue<Object> queue = this.queue;
            int i = 1;
            while (this.error == null) {
                if (this.mainDone || this.open == 0) {
                    removeAll();
                    Iterator<UnicastProcessor<T>> it = this.windows.iterator();
                    while (it.hasNext()) {
                        it.next().onComplete();
                    }
                    this.windows = null;
                    subscriber.onComplete();
                    return;
                }
                Object poll = queue.poll();
                if (poll == null) {
                    i = WIP.addAndGet(this, -i);
                    if (i == 0) {
                        return;
                    }
                } else if (poll instanceof NewWindow) {
                    if (!this.cancelled && this.open != 0 && !this.mainDone) {
                        NewWindow newWindow = (NewWindow) poll;
                        Queue<T> queue2 = this.processorQueueSupplier.get();
                        try {
                            Publisher publisher = (Publisher) Objects.requireNonNull(this.end.apply(newWindow.value), "The end returned a null publisher");
                            OPEN.getAndIncrement(this);
                            UnicastProcessor<T> unicastProcessor = new UnicastProcessor<>(queue2, this);
                            WindowStartEndEnder<T, V> windowStartEndEnder = new WindowStartEndEnder<>(this, unicastProcessor);
                            this.windows.add(unicastProcessor);
                            if (add(windowStartEndEnder)) {
                                long j = this.requested;
                                if (j != 0) {
                                    subscriber.onNext(unicastProcessor);
                                    if (j != Long.MAX_VALUE) {
                                        REQUESTED.decrementAndGet(this);
                                    }
                                    publisher.subscribe(windowStartEndEnder);
                                } else {
                                    Exceptions.addThrowable(ERROR, this, Exceptions.failWithOverflow("Could not emit window due to lack of requests"));
                                }
                            }
                        } catch (Throwable th) {
                            Exceptions.addThrowable(ERROR, this, Operators.onOperatorError(this.s, th, newWindow.value));
                        }
                    }
                } else if (poll instanceof WindowStartEndEnder) {
                    ((WindowStartEndEnder) poll).window.onComplete();
                } else {
                    Iterator<UnicastProcessor<T>> it2 = this.windows.iterator();
                    while (it2.hasNext()) {
                        it2.next().onNext(poll);
                    }
                }
            }
            Throwable terminate = Exceptions.terminate(ERROR, this);
            if (terminate != Exceptions.TERMINATED) {
                Operators.terminate(S, this);
                this.starter.cancel();
                removeAll();
                Iterator<UnicastProcessor<T>> it3 = this.windows.iterator();
                while (it3.hasNext()) {
                    it3.next().onError(terminate);
                }
                this.windows = null;
                queue.clear();
                subscriber.onError(terminate);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:reactor/core/publisher/FluxWindowStartEnd$WindowStartEndStarter.class */
    public static final class WindowStartEndStarter<T, U, V> extends Operators.DeferredSubscription implements Subscriber<U> {
        final WindowStartEndMainSubscriber<T, U, V> main;

        public WindowStartEndStarter(WindowStartEndMainSubscriber<T, U, V> windowStartEndMainSubscriber) {
            this.main = windowStartEndMainSubscriber;
        }

        public void onSubscribe(Subscription subscription) {
            if (set(subscription)) {
                subscription.request(Long.MAX_VALUE);
            }
        }

        public void onNext(U u) {
            this.main.starterNext(u);
        }

        public void onError(Throwable th) {
            this.main.starterError(th);
        }

        public void onComplete() {
            this.main.starterComplete();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public FluxWindowStartEnd(Publisher<? extends T> publisher, Publisher<U> publisher2, Function<? super U, ? extends Publisher<V>> function, Supplier<? extends Queue<Object>> supplier, Supplier<? extends Queue<T>> supplier2) {
        super(publisher);
        this.start = (Publisher) Objects.requireNonNull(publisher2, "start");
        this.end = (Function) Objects.requireNonNull(function, "end");
        this.drainQueueSupplier = (Supplier) Objects.requireNonNull(supplier, "drainQueueSupplier");
        this.processorQueueSupplier = (Supplier) Objects.requireNonNull(supplier2, "processorQueueSupplier");
    }

    @Override // reactor.core.publisher.Flux
    public long getPrefetch() {
        return 2147483647L;
    }

    @Override // reactor.core.publisher.FluxSource
    public void subscribe(Subscriber<? super Flux<T>> subscriber) {
        WindowStartEndMainSubscriber windowStartEndMainSubscriber = new WindowStartEndMainSubscriber(subscriber, this.drainQueueSupplier.get(), this.end, this.processorQueueSupplier);
        subscriber.onSubscribe(windowStartEndMainSubscriber);
        this.start.subscribe(windowStartEndMainSubscriber.starter);
        this.source.subscribe(windowStartEndMainSubscriber);
    }
}
