package io.micronaut.http.server.netty.body;

import io.micronaut.core.annotation.Internal;
import io.micronaut.core.annotation.NonNull;
import io.micronaut.core.annotation.Nullable;
import io.micronaut.core.execution.DelayedExecutionFlow;
import io.micronaut.core.execution.ExecutionFlow;
import io.micronaut.core.util.SupplierUtil;
import io.micronaut.http.body.ByteBody;
import io.micronaut.http.body.CloseableAvailableByteBody;
import io.micronaut.http.body.CloseableByteBody;
import io.micronaut.http.exceptions.BufferLengthExceededException;
import io.micronaut.http.exceptions.ContentLengthExceededException;
import io.micronaut.http.netty.EventLoopFlow;
import io.micronaut.http.netty.PublisherAsBlocking;
import io.micronaut.http.netty.PublisherAsStream;
import io.micronaut.http.server.netty.body.BufferConsumer;
import io.micronaut.http.server.netty.body.UpstreamBalancer;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.EventLoop;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.util.ResourceLeakDetector;
import io.netty.util.ResourceLeakDetectorFactory;
import io.netty.util.ResourceLeakTracker;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.OptionalLong;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;

@Internal
/* loaded from: input_file:io/micronaut/http/server/netty/body/StreamingNettyByteBody.class */
public final class StreamingNettyByteBody extends NettyByteBody implements CloseableByteBody {
    private final SharedBuffer sharedBuffer;
    private BufferConsumer.Upstream upstream;

    /* loaded from: input_file:io/micronaut/http/server/netty/body/StreamingNettyByteBody$SharedBuffer.class */
    public static final class SharedBuffer implements BufferConsumer {
        private static final Supplier<ResourceLeakDetector<SharedBuffer>> LEAK_DETECTOR;
        private final EventLoopFlow eventLoopFlow;
        private final BodySizeLimits limits;
        private final BufferConsumer.Upstream rootUpstream;
        private CompositeByteBuf buffer;
        private boolean complete;
        private Throwable error;
        private List<BufferConsumer> subscribers;
        private List<DelayedExecutionFlow<ByteBuf>> fullSubscribers;
        static final /* synthetic */ boolean $assertionsDisabled;

        @Nullable
        private final ResourceLeakTracker<SharedBuffer> tracker = LEAK_DETECTOR.get().track(this);
        private int reserved = 1;
        private boolean working = false;
        private long lengthSoFar = 0;
        private volatile long expectedLength = -1;

        public SharedBuffer(EventLoop eventLoop, BodySizeLimits bodySizeLimits, BufferConsumer.Upstream upstream) {
            this.eventLoopFlow = new EventLoopFlow(eventLoop);
            this.limits = bodySizeLimits;
            this.rootUpstream = upstream;
        }

        public void setExpectedLengthFrom(HttpHeaders httpHeaders) {
            String str = httpHeaders.get(HttpHeaderNames.CONTENT_LENGTH);
            if (str == null) {
                return;
            }
            try {
                long parseLong = Long.parseLong(str);
                if (parseLong < 0) {
                    return;
                }
                if (parseLong > this.limits.maxBodySize()) {
                    error(new ContentLengthExceededException(this.limits.maxBodySize(), parseLong));
                }
                setExpectedLength(parseLong);
            } catch (NumberFormatException e) {
            }
        }

        public void setExpectedLength(long j) {
            if (j < 0) {
                throw new IllegalArgumentException("Should be > 0");
            }
            this.expectedLength = j;
        }

        void reserve() {
            if (this.eventLoopFlow.executeNow(this::reserve0)) {
                reserve0();
            }
        }

        private void reserve0() {
            if (this.reserved == 0) {
                throw new IllegalStateException("Cannot go from streaming state back to buffering state");
            }
            this.reserved++;
            if (this.tracker != null) {
                this.tracker.record();
            }
        }

        void subscribe(@Nullable BufferConsumer bufferConsumer, BufferConsumer.Upstream upstream) {
            if (this.eventLoopFlow.executeNow(() -> {
                subscribe0(bufferConsumer, upstream);
            })) {
                subscribe0(bufferConsumer, upstream);
            }
        }

        private void subscribe0(@Nullable BufferConsumer bufferConsumer, BufferConsumer.Upstream upstream) {
            if (!$assertionsDisabled && this.working) {
                throw new AssertionError();
            }
            if (this.reserved == 0) {
                throw new IllegalStateException("Need to reserve a spot first");
            }
            this.working = true;
            int i = this.reserved - 1;
            this.reserved = i;
            boolean z = i == 0;
            if (bufferConsumer != null) {
                if (this.subscribers == null) {
                    this.subscribers = new ArrayList(1);
                }
                this.subscribers.add(bufferConsumer);
                if (this.buffer != null) {
                    if (z) {
                        bufferConsumer.add(this.buffer.slice());
                        this.buffer = null;
                    } else {
                        bufferConsumer.add(this.buffer.retainedSlice());
                    }
                }
                if (this.error != null) {
                    bufferConsumer.error(this.error);
                } else if (this.lengthSoFar > this.limits.maxBufferSize()) {
                    bufferConsumer.error(new BufferLengthExceededException(this.limits.maxBufferSize(), this.lengthSoFar));
                    upstream.allowDiscard();
                }
                if (this.complete) {
                    bufferConsumer.complete();
                }
            } else if (this.buffer != null && z) {
                this.buffer.release();
                this.buffer = null;
            }
            if (this.tracker != null) {
                if (z) {
                    this.tracker.close(this);
                } else {
                    this.tracker.record();
                }
            }
            this.working = false;
        }

        ExecutionFlow<ByteBuf> subscribeFull(BufferConsumer.Upstream upstream) {
            DelayedExecutionFlow<ByteBuf> create = DelayedExecutionFlow.create();
            return this.eventLoopFlow.executeNow(() -> {
                ExecutionFlow<ByteBuf> subscribeFull0 = subscribeFull0(create, upstream, false);
                if (!$assertionsDisabled && subscribeFull0 != create) {
                    throw new AssertionError();
                }
            }) ? subscribeFull0(create, upstream, true) : create;
        }

        private ExecutionFlow<ByteBuf> subscribeFull0(DelayedExecutionFlow<ByteBuf> delayedExecutionFlow, BufferConsumer.Upstream upstream, boolean z) {
            ByteBuf retainedSlice;
            if (!$assertionsDisabled && this.working) {
                throw new AssertionError();
            }
            if (this.reserved <= 0) {
                throw new IllegalStateException("Need to reserve a spot first. This should not happen, StreamingNettyByteBody should guard against it");
            }
            DelayedExecutionFlow<ByteBuf> delayedExecutionFlow2 = delayedExecutionFlow;
            this.working = true;
            int i = this.reserved - 1;
            this.reserved = i;
            boolean z2 = i == 0;
            BufferLengthExceededException bufferLengthExceededException = this.error;
            if (bufferLengthExceededException == null && this.lengthSoFar > this.limits.maxBufferSize()) {
                bufferLengthExceededException = new BufferLengthExceededException(this.limits.maxBufferSize(), this.lengthSoFar);
                upstream.allowDiscard();
            }
            if (bufferLengthExceededException != null) {
                if (z) {
                    delayedExecutionFlow2 = ExecutionFlow.error(bufferLengthExceededException);
                } else {
                    delayedExecutionFlow.completeExceptionally(bufferLengthExceededException);
                }
            } else if (this.complete) {
                if (this.buffer == null) {
                    retainedSlice = Unpooled.EMPTY_BUFFER;
                } else if (z2) {
                    retainedSlice = this.buffer;
                    this.buffer = null;
                } else {
                    retainedSlice = this.buffer.retainedSlice();
                }
                if (z) {
                    delayedExecutionFlow2 = ExecutionFlow.just(retainedSlice);
                } else {
                    delayedExecutionFlow.complete(retainedSlice);
                }
            } else {
                if (this.fullSubscribers == null) {
                    this.fullSubscribers = new ArrayList(1);
                }
                this.fullSubscribers.add(delayedExecutionFlow);
            }
            if (this.tracker != null) {
                if (z2) {
                    this.tracker.close(this);
                } else {
                    this.tracker.record();
                }
            }
            this.working = false;
            return delayedExecutionFlow2;
        }

        @Override // io.micronaut.http.server.netty.body.BufferConsumer
        public void add(ByteBuf byteBuf) {
            if (!$assertionsDisabled && this.working) {
                throw new AssertionError();
            }
            byteBuf.touch();
            if (this.complete || this.error != null) {
                byteBuf.release();
                return;
            }
            long readableBytes = this.lengthSoFar + byteBuf.readableBytes();
            this.lengthSoFar = readableBytes;
            if (readableBytes > this.limits.maxBodySize()) {
                byteBuf.release();
                error(new ContentLengthExceededException(this.limits.maxBodySize(), readableBytes));
                this.rootUpstream.allowDiscard();
                return;
            }
            this.working = true;
            if (this.subscribers != null) {
                Iterator<BufferConsumer> it = this.subscribers.iterator();
                while (it.hasNext()) {
                    it.next().add(byteBuf.retainedSlice());
                }
            }
            if (this.reserved <= 0 && this.fullSubscribers == null) {
                byteBuf.release();
            } else if (readableBytes > this.limits.maxBufferSize()) {
                byteBuf.release();
                if (this.buffer != null) {
                    this.buffer.release();
                    this.buffer = null;
                }
                if (this.fullSubscribers != null) {
                    BufferLengthExceededException bufferLengthExceededException = new BufferLengthExceededException(this.limits.maxBufferSize(), this.lengthSoFar);
                    Iterator<DelayedExecutionFlow<ByteBuf>> it2 = this.fullSubscribers.iterator();
                    while (it2.hasNext()) {
                        it2.next().completeExceptionally(bufferLengthExceededException);
                    }
                }
            } else {
                if (this.buffer == null) {
                    this.buffer = byteBuf.alloc().compositeBuffer();
                }
                this.buffer.addComponent(true, byteBuf);
            }
            this.working = false;
        }

        @Override // io.micronaut.http.server.netty.body.BufferConsumer
        public void complete() {
            ByteBuf byteBuf;
            boolean z;
            this.complete = true;
            this.expectedLength = this.lengthSoFar;
            if (this.subscribers != null) {
                Iterator<BufferConsumer> it = this.subscribers.iterator();
                while (it.hasNext()) {
                    it.next().complete();
                }
            }
            if (this.fullSubscribers != null) {
                if (this.buffer == null) {
                    byteBuf = Unpooled.EMPTY_BUFFER;
                    z = false;
                } else {
                    byteBuf = this.buffer;
                    if (this.reserved > 0) {
                        z = false;
                    } else {
                        this.buffer = null;
                        z = true;
                    }
                }
                Iterator<DelayedExecutionFlow<ByteBuf>> it2 = this.fullSubscribers.iterator();
                while (it2.hasNext()) {
                    it2.next().complete(byteBuf.retainedSlice());
                }
                if (z) {
                    byteBuf.release();
                }
            }
        }

        @Override // io.micronaut.http.server.netty.body.BufferConsumer
        public void error(Throwable th) {
            this.error = th;
            if (this.buffer != null) {
                this.buffer.release();
                this.buffer = null;
            }
            if (this.subscribers != null) {
                Iterator<BufferConsumer> it = this.subscribers.iterator();
                while (it.hasNext()) {
                    it.next().error(th);
                }
            }
            if (this.fullSubscribers != null) {
                Iterator<DelayedExecutionFlow<ByteBuf>> it2 = this.fullSubscribers.iterator();
                while (it2.hasNext()) {
                    it2.next().completeExceptionally(th);
                }
            }
        }

        static {
            $assertionsDisabled = !StreamingNettyByteBody.class.desiredAssertionStatus();
            LEAK_DETECTOR = SupplierUtil.memoized(() -> {
                return ResourceLeakDetectorFactory.instance().newResourceLeakDetector(SharedBuffer.class);
            });
        }
    }

    public StreamingNettyByteBody(SharedBuffer sharedBuffer) {
        this(sharedBuffer, sharedBuffer.rootUpstream);
    }

    private StreamingNettyByteBody(SharedBuffer sharedBuffer, BufferConsumer.Upstream upstream) {
        this.sharedBuffer = sharedBuffer;
        this.upstream = upstream;
    }

    public BufferConsumer.Upstream primary(BufferConsumer bufferConsumer) {
        BufferConsumer.Upstream upstream = this.upstream;
        if (upstream == null) {
            failClaim();
        }
        this.upstream = null;
        this.sharedBuffer.subscribe(bufferConsumer, upstream);
        return upstream;
    }

    @NonNull
    public CloseableByteBody split(@NonNull ByteBody.SplitBackpressureMode splitBackpressureMode) {
        BufferConsumer.Upstream upstream = this.upstream;
        if (upstream == null) {
            failClaim();
        }
        UpstreamBalancer.UpstreamPair balancer = UpstreamBalancer.balancer(upstream, splitBackpressureMode);
        this.upstream = balancer.left();
        this.sharedBuffer.reserve();
        return new StreamingNettyByteBody(this.sharedBuffer, balancer.right());
    }

    @NonNull
    /* renamed from: allowDiscard, reason: merged with bridge method [inline-methods] and merged with bridge method [inline-methods] */
    public StreamingNettyByteBody m69allowDiscard() {
        BufferConsumer.Upstream upstream = this.upstream;
        if (upstream == null) {
            failClaim();
        }
        upstream.allowDiscard();
        return this;
    }

    @Override // io.micronaut.http.server.netty.body.NettyByteBody
    protected Flux<ByteBuf> toByteBufPublisher() {
        final AtomicLong atomicLong = new AtomicLong(0L);
        final Sinks.Many onBackpressureBuffer = Sinks.many().unicast().onBackpressureBuffer();
        BufferConsumer.Upstream primary = primary(new BufferConsumer() { // from class: io.micronaut.http.server.netty.body.StreamingNettyByteBody.1
            @Override // io.micronaut.http.server.netty.body.BufferConsumer
            public void add(ByteBuf byteBuf) {
                long addAndGet = atomicLong.addAndGet(byteBuf.readableBytes());
                if (addAndGet > StreamingNettyByteBody.this.sharedBuffer.limits.maxBufferSize()) {
                    onBackpressureBuffer.tryEmitError(new BufferLengthExceededException(StreamingNettyByteBody.this.sharedBuffer.limits.maxBufferSize(), addAndGet));
                    byteBuf.release();
                } else if (onBackpressureBuffer.tryEmitNext(byteBuf) != Sinks.EmitResult.OK) {
                    byteBuf.release();
                }
            }

            @Override // io.micronaut.http.server.netty.body.BufferConsumer
            public void complete() {
                onBackpressureBuffer.tryEmitComplete();
            }

            @Override // io.micronaut.http.server.netty.body.BufferConsumer
            public void error(Throwable th) {
                onBackpressureBuffer.tryEmitError(th);
            }
        });
        return onBackpressureBuffer.asFlux().doOnSubscribe(subscription -> {
            primary.start();
        }).doOnNext(byteBuf -> {
            atomicLong.addAndGet(-byteBuf.readableBytes());
            primary.onBytesConsumed(byteBuf.readableBytes());
        }).doOnDiscard(ByteBuf.class, (v0) -> {
            v0.release();
        }).doOnCancel(() -> {
            primary.allowDiscard();
            primary.disregardBackpressure();
        });
    }

    @NonNull
    public OptionalLong expectedLength() {
        long j = this.sharedBuffer.expectedLength;
        return j < 0 ? OptionalLong.empty() : OptionalLong.of(j);
    }

    @NonNull
    public InputStream toInputStream() {
        PublisherAsBlocking publisherAsBlocking = new PublisherAsBlocking();
        toByteBufPublisher().subscribe(publisherAsBlocking);
        return new PublisherAsStream(publisherAsBlocking);
    }

    @Override // io.micronaut.http.server.netty.body.NettyByteBody
    @NonNull
    public ExecutionFlow<? extends CloseableAvailableByteBody> bufferFlow() {
        BufferConsumer.Upstream upstream = this.upstream;
        if (upstream == null) {
            failClaim();
        }
        this.upstream = null;
        upstream.start();
        upstream.onBytesConsumed(Long.MAX_VALUE);
        return this.sharedBuffer.subscribeFull(upstream).map(AvailableNettyByteBody::new);
    }

    public void close() {
        BufferConsumer.Upstream upstream = this.upstream;
        if (upstream == null) {
            return;
        }
        this.upstream = null;
        upstream.allowDiscard();
        upstream.disregardBackpressure();
        upstream.start();
        this.sharedBuffer.subscribe(null, upstream);
    }
}
