/*
 * Decompiled with CFR 0.152.
 */
package io.micronaut.http.body.stream;

import io.micronaut.core.annotation.Internal;
import io.micronaut.core.annotation.NonNull;
import io.micronaut.core.annotation.Nullable;
import io.micronaut.core.execution.DelayedExecutionFlow;
import io.micronaut.core.execution.ExecutionFlow;
import io.micronaut.http.body.ByteBody;
import io.micronaut.http.body.stream.BodySizeLimits;
import io.micronaut.http.body.stream.BufferConsumer;
import io.micronaut.http.exceptions.BufferLengthExceededException;
import io.micronaut.http.exceptions.ContentLengthExceededException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.OptionalLong;
import java.util.concurrent.atomic.AtomicLong;
import org.jetbrains.annotations.Contract;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;

@Internal
public abstract class BaseSharedBuffer<C extends BufferConsumer, F> {
    private static final Class<ByteBody> SPLIT_LOG_CLASS = ByteBody.class;
    private static final Logger SPLIT_LOG = LoggerFactory.getLogger(SPLIT_LOG_CLASS);
    private final BodySizeLimits limits;
    private final BufferConsumer.Upstream rootUpstream;
    private boolean complete;
    private Throwable error;
    private int reserved = 1;
    private List<@NonNull C> subscribers;
    private List<@NonNull DelayedExecutionFlow<F>> fullSubscribers;
    private boolean working = false;
    private long lengthSoFar = 0L;
    private volatile long expectedLength = -1L;

    public BaseSharedBuffer(BodySizeLimits limits, BufferConsumer.Upstream rootUpstream) {
        this.limits = limits;
        this.rootUpstream = rootUpstream;
    }

    @Contract(value="-> fail")
    public static void failClaim() {
        throw new IllegalStateException("Request body has already been claimed: Two conflicting sites are trying to access the request body. If this is intentional, the first user must ByteBody#split the body. To find out where the body was claimed, turn on TRACE logging for " + SPLIT_LOG_CLASS.getName() + ".");
    }

    public static void logClaim() {
        if (SPLIT_LOG.isTraceEnabled()) {
            SPLIT_LOG.trace("Body split at this location. This is not an error, but may aid in debugging other errors", (Throwable)new Exception());
        }
    }

    public final OptionalLong getExpectedLength() {
        long l = this.expectedLength;
        return l < 0L ? OptionalLong.empty() : OptionalLong.of(l);
    }

    public final BodySizeLimits getLimits() {
        return this.limits;
    }

    public final BufferConsumer.Upstream getRootUpstream() {
        return this.rootUpstream;
    }

    public final void setExpectedLengthFrom(String contentLength) {
        long parsed;
        if (contentLength == null) {
            return;
        }
        try {
            parsed = Long.parseLong(contentLength);
        }
        catch (NumberFormatException e) {
            return;
        }
        if (parsed < 0L) {
            return;
        }
        if (parsed > this.limits.maxBodySize()) {
            this.error(new ContentLengthExceededException(this.limits.maxBodySize(), parsed));
        }
        this.setExpectedLength(parsed);
    }

    public final void setExpectedLength(long length) {
        if (length < 0L) {
            throw new IllegalArgumentException("Should be > 0");
        }
        this.expectedLength = length;
    }

    protected void reserve0() {
        if (this.reserved == 0) {
            throw new IllegalStateException("Cannot go from streaming state back to buffering state");
        }
        ++this.reserved;
    }

    protected abstract void forwardInitialBuffer(@Nullable C var1, boolean var2);

    protected void afterSubscribe(boolean last) {
    }

    protected abstract F subscribeFullResult(boolean var1);

    protected final void subscribe0(@Nullable C subscriber, BufferConsumer.Upstream specificUpstream) {
        boolean last;
        assert (!this.working);
        if (this.reserved == 0) {
            throw new IllegalStateException("Need to reserve a spot first");
        }
        this.working = true;
        boolean bl = last = --this.reserved == 0;
        if (subscriber != null) {
            if (this.subscribers == null) {
                this.subscribers = new ArrayList<C>(1);
            }
            this.subscribers.add(subscriber);
            this.forwardInitialBuffer(subscriber, last);
            if (this.error != null) {
                subscriber.error(this.error);
            } else if (this.lengthSoFar > this.limits.maxBufferSize()) {
                subscriber.error(new BufferLengthExceededException(this.limits.maxBufferSize(), this.lengthSoFar));
                specificUpstream.allowDiscard();
            }
            if (this.complete) {
                subscriber.complete();
            }
        } else {
            this.forwardInitialBuffer(null, last);
        }
        this.afterSubscribe(last);
        this.working = false;
    }

    protected final ExecutionFlow<F> subscribeFull0(DelayedExecutionFlow<F> targetFlow, BufferConsumer.Upstream specificUpstream, boolean canReturnImmediate) {
        assert (!this.working);
        if (this.reserved <= 0) {
            throw new IllegalStateException("Need to reserve a spot first. This should not happen, StreamingNettyByteBody should guard against it");
        }
        ExecutionFlow ret = targetFlow;
        this.working = true;
        boolean last = --this.reserved == 0;
        Throwable error = this.error;
        if (error == null && this.lengthSoFar > this.limits.maxBufferSize()) {
            error = new BufferLengthExceededException(this.limits.maxBufferSize(), this.lengthSoFar);
            specificUpstream.allowDiscard();
        }
        if (error != null) {
            if (canReturnImmediate) {
                ret = ExecutionFlow.error((Throwable)error);
            } else {
                targetFlow.completeExceptionally(error);
            }
        } else if (this.complete) {
            F buf = this.subscribeFullResult(last);
            if (canReturnImmediate) {
                ret = ExecutionFlow.just(buf);
            } else {
                targetFlow.complete(buf);
            }
        } else {
            if (this.fullSubscribers == null) {
                this.fullSubscribers = new ArrayList<DelayedExecutionFlow<F>>(1);
            }
            this.fullSubscribers.add((DelayedExecutionFlow<F>)targetFlow);
        }
        this.afterSubscribe(last);
        this.working = false;
        return ret;
    }

    protected abstract void addForward(List<C> var1);

    protected void addDoNotBuffer() {
    }

    protected abstract void addBuffer();

    protected abstract void discardBuffer();

    protected final void add(int n) {
        assert (!this.working);
        long newLength = this.lengthSoFar + (long)n;
        long expectedLength = this.expectedLength;
        if (expectedLength != -1L && newLength > expectedLength) {
            throw new IllegalStateException("Received more bytes than specified by Content-Length");
        }
        this.lengthSoFar = newLength;
        if (this.complete || this.error != null) {
            this.addDoNotBuffer();
            return;
        }
        if (newLength > this.limits.maxBodySize()) {
            this.addDoNotBuffer();
            this.error(new ContentLengthExceededException(this.limits.maxBodySize(), newLength));
            this.rootUpstream.allowDiscard();
            return;
        }
        this.working = true;
        if (this.subscribers != null) {
            this.addForward(this.subscribers);
        }
        if (this.reserved > 0 || this.fullSubscribers != null) {
            if (newLength > this.limits.maxBufferSize()) {
                this.addDoNotBuffer();
                this.discardBuffer();
                if (this.fullSubscribers != null) {
                    BufferLengthExceededException e = new BufferLengthExceededException(this.limits.maxBufferSize(), this.lengthSoFar);
                    for (DelayedExecutionFlow<F> fullSubscriber : this.fullSubscribers) {
                        fullSubscriber.completeExceptionally((Throwable)e);
                    }
                }
            } else {
                this.addBuffer();
            }
        } else {
            this.addDoNotBuffer();
        }
        this.working = false;
    }

    public void complete() {
        if (this.expectedLength > this.lengthSoFar) {
            throw new IllegalStateException("Received fewer bytes than specified by Content-Length");
        }
        this.complete = true;
        this.expectedLength = this.lengthSoFar;
        if (this.subscribers != null) {
            for (BufferConsumer subscriber : this.subscribers) {
                subscriber.complete();
            }
        }
        if (this.fullSubscribers != null) {
            boolean last = this.reserved <= 0;
            Iterator<DelayedExecutionFlow<F>> iterator = this.fullSubscribers.iterator();
            while (iterator.hasNext()) {
                DelayedExecutionFlow<F> fullSubscriber = iterator.next();
                fullSubscriber.complete(this.subscribeFullResult(last && !iterator.hasNext()));
            }
        }
    }

    public void error(Throwable e) {
        if (this.error != null) {
            this.error.addSuppressed(e);
            return;
        }
        this.error = e;
        this.discardBuffer();
        if (this.subscribers != null) {
            for (BufferConsumer bufferConsumer : this.subscribers) {
                bufferConsumer.error(e);
            }
        }
        if (this.fullSubscribers != null) {
            for (DelayedExecutionFlow delayedExecutionFlow : this.fullSubscribers) {
                delayedExecutionFlow.completeExceptionally(e);
            }
        }
    }

    public static abstract class AsFlux<B>
    implements BufferConsumer {
        private final BaseSharedBuffer<?, ?> sharedBuffer;
        private final AtomicLong unconsumed = new AtomicLong(0L);
        private final Sinks.Many<B> sink = Sinks.many().unicast().onBackpressureBuffer();

        public AsFlux(BaseSharedBuffer<?, ?> sharedBuffer) {
            this.sharedBuffer = sharedBuffer;
        }

        protected abstract int size(B var1);

        public final boolean add0(B buf) {
            long newLength = this.unconsumed.addAndGet(this.size(buf));
            if (newLength > this.sharedBuffer.getLimits().maxBufferSize()) {
                this.sink.tryEmitError((Throwable)new BufferLengthExceededException(this.sharedBuffer.getLimits().maxBufferSize(), newLength));
                return false;
            }
            return this.sink.tryEmitNext(buf) == Sinks.EmitResult.OK;
        }

        @Override
        public final void complete() {
            this.sink.tryEmitComplete();
        }

        @Override
        public final void error(Throwable e) {
            this.sink.tryEmitError(e);
        }

        public final Flux<B> asFlux(BufferConsumer.Upstream upstream) {
            return this.sink.asFlux().doOnSubscribe(s -> upstream.start()).doOnNext(bb -> {
                int size = this.size(bb);
                this.unconsumed.addAndGet(-size);
                upstream.onBytesConsumed(size);
            }).doOnCancel(() -> {
                upstream.allowDiscard();
                upstream.disregardBackpressure();
            });
        }
    }
}

