package reactor.core.reactivestreams;

import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.support.Assert;
import reactor.core.support.Exceptions;
import reactor.core.support.SpecificationExceptions;
import reactor.fn.BiConsumer;
import reactor.fn.Consumer;
import reactor.fn.Function;

/* loaded from: input_file:reactor/core/reactivestreams/PublisherFactory.class */
public class PublisherFactory<T, C> implements Publisher<T> {
    protected final Function<Subscriber<? super T>, C> contextFactory;
    protected final BiConsumer<Long, SubscriberWithContext<T, C>> requestConsumer;
    protected final Consumer<C> shutdownConsumer;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:reactor/core/reactivestreams/PublisherFactory$ForEachBiConsumer.class */
    public static final class ForEachBiConsumer<T, C> implements BiConsumer<Long, SubscriberWithContext<T, C>> {
        private final Consumer<SubscriberWithContext<T, C>> requestConsumer;
        private volatile long pending = 0;
        private static final AtomicLongFieldUpdater<ForEachBiConsumer> PENDING_UPDATER = AtomicLongFieldUpdater.newUpdater(ForEachBiConsumer.class, "pending");

        public ForEachBiConsumer(Consumer<SubscriberWithContext<T, C>> consumer) {
            this.requestConsumer = consumer;
        }

        @Override // reactor.fn.BiConsumer
        public void accept(Long l, SubscriberWithContext<T, C> subscriberWithContext) {
            if (this.pending == Long.MAX_VALUE) {
                return;
            }
            long longValue = l.longValue();
            if (!PENDING_UPDATER.compareAndSet(this, 0L, longValue)) {
                long addAndGet = PENDING_UPDATER.addAndGet(this, longValue);
                if (addAndGet != longValue && (addAndGet >= 0 || !PENDING_UPDATER.compareAndSet(this, addAndGet, Long.MAX_VALUE))) {
                    return;
                }
            }
            do {
                long j = 0;
                while (true) {
                    long j2 = j;
                    j = j2 + 1;
                    if ((j2 < longValue || longValue == Long.MAX_VALUE) && !subscriberWithContext.isCancelled()) {
                        this.requestConsumer.accept(subscriberWithContext);
                    }
                }
                long addAndGet2 = PENDING_UPDATER.addAndGet(this, -longValue);
                longValue = addAndGet2;
                if (addAndGet2 <= 0) {
                    return;
                }
            } while (!subscriberWithContext.isCancelled());
        }
    }

    /* loaded from: input_file:reactor/core/reactivestreams/PublisherFactory$SubscriberProxy.class */
    private static final class SubscriberProxy<T, C> extends SubscriberWithContext<T, C> implements Subscription {
        private final BiConsumer<Long, SubscriberWithContext<T, C>> requestConsumer;
        private final Consumer<C> shutdownConsumer;

        public SubscriberProxy(Subscriber<? super T> subscriber, C c, BiConsumer<Long, SubscriberWithContext<T, C>> biConsumer, Consumer<C> consumer) {
            super(c, subscriber);
            this.requestConsumer = biConsumer;
            this.shutdownConsumer = consumer;
        }

        public void request(long j) {
            if (isCancelled()) {
                return;
            }
            if (j <= 0) {
                onError(SpecificationExceptions.spec_3_09_exception(j));
                return;
            }
            try {
                this.requestConsumer.accept(Long.valueOf(j), this);
            } catch (Throwable th) {
                onError(th);
            }
        }

        public void cancel() {
            if (TERMINAL_UPDATER.compareAndSet(this, 0, 1)) {
                doShutdown();
            }
        }

        @Override // reactor.core.reactivestreams.SubscriberWithContext
        public void onError(Throwable th) {
            if (TERMINAL_UPDATER.compareAndSet(this, 0, 1)) {
                doShutdown();
                this.subscriber.onError(th);
            }
        }

        @Override // reactor.core.reactivestreams.SubscriberWithContext
        public void onComplete() {
            if (TERMINAL_UPDATER.compareAndSet(this, 0, 1)) {
                doShutdown();
                try {
                    this.subscriber.onComplete();
                } catch (Throwable th) {
                    this.subscriber.onError(th);
                }
            }
        }

        private void doShutdown() {
            if (this.shutdownConsumer == null) {
                return;
            }
            try {
                this.shutdownConsumer.accept(this.context);
            } catch (Throwable th) {
                this.subscriber.onError(th);
            }
        }

        @Override // reactor.core.reactivestreams.SubscriberWithContext
        public void onSubscribe(Subscription subscription) {
            throw new UnsupportedOperationException(" the delegate subscriber is already subscribed");
        }
    }

    public static <T> Publisher<T> create(BiConsumer<Long, SubscriberWithContext<T, Void>> biConsumer) {
        return create(biConsumer, null, null);
    }

    public static <T, C> Publisher<T> create(BiConsumer<Long, SubscriberWithContext<T, C>> biConsumer, Function<Subscriber<? super T>, C> function) {
        return create(biConsumer, function, null);
    }

    public static <T, C> Publisher<T> create(BiConsumer<Long, SubscriberWithContext<T, C>> biConsumer, Function<Subscriber<? super T>, C> function, Consumer<C> consumer) {
        return new PublisherFactory(biConsumer, function, consumer);
    }

    public static <T> Publisher<T> forEach(Consumer<SubscriberWithContext<T, Void>> consumer) {
        return forEach(consumer, null, null);
    }

    public static <T, C> Publisher<T> forEach(Consumer<SubscriberWithContext<T, C>> consumer, Function<Subscriber<? super T>, C> function) {
        return forEach(consumer, function, null);
    }

    public static <T, C> Publisher<T> forEach(Consumer<SubscriberWithContext<T, C>> consumer, Function<Subscriber<? super T>, C> function, Consumer<C> consumer2) {
        Assert.notNull(consumer, "A data producer must be provided");
        return create(new ForEachBiConsumer(consumer), function, consumer2);
    }

    protected PublisherFactory(BiConsumer<Long, SubscriberWithContext<T, C>> biConsumer, Function<Subscriber<? super T>, C> function, Consumer<C> consumer) {
        Assert.notNull(biConsumer, "A data producer must be provided");
        this.requestConsumer = biConsumer;
        this.contextFactory = function;
        this.shutdownConsumer = consumer;
    }

    public void subscribe(Subscriber<? super T> subscriber) {
        try {
            subscriber.onSubscribe(new SubscriberProxy(subscriber, this.contextFactory != null ? this.contextFactory.apply(subscriber) : null, this.requestConsumer, this.shutdownConsumer));
        } catch (Throwable th) {
            Exceptions.throwIfFatal(th);
            subscriber.onError(th);
        }
    }
}
