/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.commons.reactive;

import io.reactivex.rxjava3.functions.Action;
import io.reactivex.rxjava3.functions.LongConsumer;
import io.reactivex.rxjava3.processors.FlowableProcessor;
import io.reactivex.rxjava3.processors.UnicastProcessor;
import java.lang.invoke.MethodHandles;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
import org.infinispan.commons.logging.Log;
import org.infinispan.commons.logging.LogFactory;
import org.infinispan.commons.reactive.RxJavaInterop;
import org.reactivestreams.Publisher;

public abstract class AbstractAsyncPublisherHandler<Target, Output, InitialResponse, NextResponse>
implements LongConsumer,
Action {
    protected static final Log log = LogFactory.getLog(MethodHandles.lookup().lookupClass());
    protected final int batchSize;
    protected final Supplier<Target> supplier;
    private final FlowableProcessor<Output> flowableProcessor;
    private final AtomicLong requestedAmount = new AtomicLong();
    private volatile Target currentTarget;
    private volatile boolean cancelled;
    private volatile boolean alreadyCreated;
    private volatile boolean started = false;
    private final InitialBiConsumer initialBiConsumer = new InitialBiConsumer();
    private final NextBiConsumer nextBiConsumer = new NextBiConsumer();

    protected AbstractAsyncPublisherHandler(int maxBatchSize, Supplier<Target> supplier, Target firstTarget) {
        if (maxBatchSize <= 0) {
            throw new IllegalArgumentException("maxBatchSize  must be greater than 0");
        }
        this.batchSize = Math.min(maxBatchSize, 65536);
        this.supplier = supplier;
        this.flowableProcessor = UnicastProcessor.create((int)this.batchSize);
        this.currentTarget = firstTarget;
    }

    public Publisher<Output> startPublisher() {
        if (this.started) {
            throw new IllegalStateException("Publisher was already started!");
        }
        this.started = true;
        return this.flowableProcessor.doOnLifecycle(RxJavaInterop.emptyConsumer(), (LongConsumer)this, (Action)this);
    }

    public void run() {
        Target target;
        this.cancelled = true;
        if (this.alreadyCreated && (target = this.currentTarget) != null) {
            this.sendCancel(target);
        }
    }

    protected abstract void sendCancel(Target var1);

    protected abstract CompletionStage<InitialResponse> sendInitialCommand(Target var1, int var2);

    protected abstract CompletionStage<NextResponse> sendNextCommand(Target var1, int var2);

    protected abstract long handleInitialResponse(InitialResponse var1, Target var2);

    protected abstract long handleNextResponse(NextResponse var1, Target var2);

    protected void handleThrowableInResponse(Throwable t, Target target) {
        this.flowableProcessor.onError(t);
    }

    public void accept(long count) {
        if (this.shouldSubmit(count)) {
            if (this.checkCancelled()) {
                return;
            }
            Target target = this.currentTarget;
            if (target == null) {
                this.alreadyCreated = false;
                target = this.supplier.get();
                if (target == null) {
                    if (log.isTraceEnabled()) {
                        log.tracef("Completing processor %s", this.flowableProcessor);
                    }
                    this.flowableProcessor.onComplete();
                    return;
                }
                this.currentTarget = target;
            }
            try {
                if (this.alreadyCreated) {
                    CompletionStage<NextResponse> stage = this.sendNextCommand(target, this.batchSize);
                    stage.whenComplete(this.nextBiConsumer);
                } else {
                    this.alreadyCreated = true;
                    CompletionStage<InitialResponse> stage = this.sendInitialCommand(target, this.batchSize);
                    stage.whenComplete(this.initialBiConsumer);
                }
            }
            catch (Throwable t) {
                this.handleThrowableInResponse(t, target);
            }
        }
    }

    protected boolean onNext(Output value) {
        if (this.checkCancelled()) {
            return false;
        }
        this.flowableProcessor.onNext(value);
        return true;
    }

    protected void targetComplete() {
        this.currentTarget = null;
    }

    private boolean shouldSubmit(long count) {
        long newValue;
        long prev;
        while (!this.requestedAmount.compareAndSet(prev = this.requestedAmount.get(), newValue = prev + count)) {
        }
        return newValue > 0L && (prev <= 0L || count <= 0L);
    }

    protected boolean checkCancelled() {
        if (this.cancelled) {
            if (log.isTraceEnabled()) {
                log.tracef("Subscription %s was cancelled, terminating early", this);
            }
            return true;
        }
        return false;
    }

    private class InitialBiConsumer
    extends ResponseConsumer<InitialResponse> {
        private InitialBiConsumer() {
        }

        @Override
        long handleResponse(InitialResponse response, Target target) {
            return AbstractAsyncPublisherHandler.this.handleInitialResponse(response, target);
        }
    }

    private class NextBiConsumer
    extends ResponseConsumer<NextResponse> {
        private NextBiConsumer() {
        }

        @Override
        long handleResponse(NextResponse response, Target target) {
            return AbstractAsyncPublisherHandler.this.handleNextResponse(response, target);
        }
    }

    private abstract class ResponseConsumer<Type>
    implements BiConsumer<Type, Throwable> {
        private ResponseConsumer() {
        }

        @Override
        public void accept(Type response, Throwable throwable) {
            if (throwable != null) {
                AbstractAsyncPublisherHandler.this.handleThrowableInResponse(throwable, AbstractAsyncPublisherHandler.this.currentTarget);
                return;
            }
            try {
                long produced = this.handleResponse(response, AbstractAsyncPublisherHandler.this.currentTarget);
                AbstractAsyncPublisherHandler.this.accept(-produced);
            }
            catch (Throwable innerT) {
                AbstractAsyncPublisherHandler.this.handleThrowableInResponse(innerT, AbstractAsyncPublisherHandler.this.currentTarget);
            }
        }

        abstract long handleResponse(Type var1, Target var2);
    }
}

