/*
 * Decompiled with CFR 0.152.
 */
package io.micronaut.http.netty.body;

import io.micronaut.core.annotation.Internal;
import io.micronaut.core.annotation.NonNull;
import io.micronaut.core.annotation.Nullable;
import io.micronaut.core.execution.DelayedExecutionFlow;
import io.micronaut.core.execution.ExecutionFlow;
import io.micronaut.core.util.SupplierUtil;
import io.micronaut.http.body.ByteBody;
import io.micronaut.http.body.CloseableAvailableByteBody;
import io.micronaut.http.body.CloseableByteBody;
import io.micronaut.http.exceptions.BufferLengthExceededException;
import io.micronaut.http.exceptions.ContentLengthExceededException;
import io.micronaut.http.netty.PublisherAsBlocking;
import io.micronaut.http.netty.PublisherAsStream;
import io.micronaut.http.netty.body.AvailableNettyByteBody;
import io.micronaut.http.netty.body.BodySizeLimits;
import io.micronaut.http.netty.body.BufferConsumer;
import io.micronaut.http.netty.body.NettyByteBody;
import io.micronaut.http.netty.body.UpstreamBalancer;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.EventLoop;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.util.ReferenceCounted;
import io.netty.util.ResourceLeakDetector;
import io.netty.util.ResourceLeakDetectorFactory;
import io.netty.util.ResourceLeakTracker;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.OptionalLong;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;

@Internal
public final class StreamingNettyByteBody
extends NettyByteBody
implements CloseableByteBody {
    private final SharedBuffer sharedBuffer;
    private final boolean forceDelaySubscribe;
    private BufferConsumer.Upstream upstream;

    public StreamingNettyByteBody(SharedBuffer sharedBuffer) {
        this(sharedBuffer, false, sharedBuffer.rootUpstream);
    }

    private StreamingNettyByteBody(SharedBuffer sharedBuffer, boolean forceDelaySubscribe, BufferConsumer.Upstream upstream) {
        this.sharedBuffer = sharedBuffer;
        this.forceDelaySubscribe = forceDelaySubscribe;
        this.upstream = upstream;
    }

    public BufferConsumer.Upstream primary(BufferConsumer primary) {
        BufferConsumer.Upstream upstream = this.upstream;
        if (upstream == null) {
            StreamingNettyByteBody.failClaim();
        }
        this.upstream = null;
        this.sharedBuffer.subscribe(primary, upstream, this.forceDelaySubscribe);
        return upstream;
    }

    @NonNull
    public CloseableByteBody split(@NonNull ByteBody.SplitBackpressureMode backpressureMode) {
        BufferConsumer.Upstream upstream = this.upstream;
        if (upstream == null) {
            StreamingNettyByteBody.failClaim();
        }
        UpstreamBalancer.UpstreamPair pair = UpstreamBalancer.balancer(upstream, backpressureMode);
        this.upstream = pair.left();
        boolean forceDelaySubscribe = this.sharedBuffer.reserve();
        return new StreamingNettyByteBody(this.sharedBuffer, forceDelaySubscribe, pair.right());
    }

    @NonNull
    public StreamingNettyByteBody allowDiscard() {
        BufferConsumer.Upstream upstream = this.upstream;
        if (upstream == null) {
            StreamingNettyByteBody.failClaim();
        }
        upstream.allowDiscard();
        return this;
    }

    @Override
    protected Flux<ByteBuf> toByteBufPublisher() {
        final AtomicLong unconsumed = new AtomicLong(0L);
        final Sinks.Many sink = Sinks.many().unicast().onBackpressureBuffer();
        BufferConsumer.Upstream upstream = this.primary(new BufferConsumer(){

            @Override
            public void add(ByteBuf buf) {
                long newLength = unconsumed.addAndGet(buf.readableBytes());
                if (newLength > StreamingNettyByteBody.this.sharedBuffer.limits.maxBufferSize()) {
                    sink.tryEmitError((Throwable)new BufferLengthExceededException(StreamingNettyByteBody.this.sharedBuffer.limits.maxBufferSize(), newLength));
                    buf.release();
                } else if (sink.tryEmitNext((Object)buf) != Sinks.EmitResult.OK) {
                    buf.release();
                }
            }

            @Override
            public void complete() {
                sink.tryEmitComplete();
            }

            @Override
            public void error(Throwable e) {
                sink.tryEmitError(e);
            }
        });
        return sink.asFlux().doOnSubscribe(s -> upstream.start()).doOnNext(bb -> {
            unconsumed.addAndGet(-bb.readableBytes());
            upstream.onBytesConsumed(bb.readableBytes());
        }).doOnDiscard(ByteBuf.class, ReferenceCounted::release).doOnCancel(() -> {
            upstream.allowDiscard();
            upstream.disregardBackpressure();
        });
    }

    @NonNull
    public OptionalLong expectedLength() {
        long l = this.sharedBuffer.expectedLength;
        return l < 0L ? OptionalLong.empty() : OptionalLong.of(l);
    }

    @NonNull
    public InputStream toInputStream() {
        PublisherAsBlocking<ByteBuf> blocking = new PublisherAsBlocking<ByteBuf>();
        this.toByteBufPublisher().subscribe(blocking);
        return new PublisherAsStream(blocking);
    }

    @Override
    @NonNull
    public ExecutionFlow<? extends CloseableAvailableByteBody> bufferFlow() {
        BufferConsumer.Upstream upstream = this.upstream;
        if (upstream == null) {
            StreamingNettyByteBody.failClaim();
        }
        this.upstream = null;
        upstream.start();
        upstream.onBytesConsumed(Long.MAX_VALUE);
        return this.sharedBuffer.subscribeFull(upstream, this.forceDelaySubscribe).map(AvailableNettyByteBody::new);
    }

    public void close() {
        BufferConsumer.Upstream upstream = this.upstream;
        if (upstream == null) {
            return;
        }
        this.upstream = null;
        upstream.allowDiscard();
        upstream.disregardBackpressure();
        upstream.start();
        this.sharedBuffer.subscribe(null, upstream, this.forceDelaySubscribe);
    }

    public static final class SharedBuffer
    implements BufferConsumer {
        private static final Supplier<ResourceLeakDetector<SharedBuffer>> LEAK_DETECTOR = SupplierUtil.memoized(() -> ResourceLeakDetectorFactory.instance().newResourceLeakDetector(SharedBuffer.class));
        @Nullable
        private final ResourceLeakTracker<SharedBuffer> tracker = LEAK_DETECTOR.get().track((Object)this);
        private final EventLoop eventLoop;
        private final BodySizeLimits limits;
        private final BufferConsumer.Upstream rootUpstream;
        private CompositeByteBuf buffer;
        private boolean complete;
        private Throwable error;
        private int reserved = 1;
        private List<@NonNull BufferConsumer> subscribers;
        private List<@NonNull DelayedExecutionFlow<ByteBuf>> fullSubscribers;
        private boolean working = false;
        private boolean adding = false;
        private long lengthSoFar = 0L;
        private volatile long expectedLength = -1L;

        public SharedBuffer(EventLoop loop, BodySizeLimits limits, BufferConsumer.Upstream rootUpstream) {
            this.eventLoop = loop;
            this.limits = limits;
            this.rootUpstream = rootUpstream;
        }

        public void setExpectedLengthFrom(HttpHeaders headers) {
            long parsed;
            String s = headers.get((CharSequence)HttpHeaderNames.CONTENT_LENGTH);
            if (s == null) {
                return;
            }
            try {
                parsed = Long.parseLong(s);
            }
            catch (NumberFormatException e) {
                return;
            }
            if (parsed < 0L) {
                return;
            }
            if (parsed > this.limits.maxBodySize()) {
                this.error((Throwable)new ContentLengthExceededException(this.limits.maxBodySize(), parsed));
            }
            this.setExpectedLength(parsed);
        }

        public void setExpectedLength(long length) {
            if (length < 0L) {
                throw new IllegalArgumentException("Should be > 0");
            }
            this.expectedLength = length;
        }

        boolean reserve() {
            if (this.eventLoop.inEventLoop() && !this.adding) {
                this.reserve0();
                return false;
            }
            this.eventLoop.execute(this::reserve0);
            return true;
        }

        private void reserve0() {
            if (this.reserved == 0) {
                throw new IllegalStateException("Cannot go from streaming state back to buffering state");
            }
            ++this.reserved;
            if (this.tracker != null) {
                this.tracker.record();
            }
        }

        void subscribe(@Nullable BufferConsumer subscriber, BufferConsumer.Upstream specificUpstream, boolean forceDelay) {
            if (!forceDelay && this.eventLoop.inEventLoop() && !this.adding) {
                this.subscribe0(subscriber, specificUpstream);
            } else {
                this.eventLoop.execute(() -> this.subscribe0(subscriber, specificUpstream));
            }
        }

        private void subscribe0(@Nullable BufferConsumer subscriber, BufferConsumer.Upstream specificUpstream) {
            boolean last;
            assert (!this.working);
            if (this.reserved == 0) {
                throw new IllegalStateException("Need to reserve a spot first");
            }
            this.working = true;
            boolean bl = last = --this.reserved == 0;
            if (subscriber != null) {
                if (this.subscribers == null) {
                    this.subscribers = new ArrayList<BufferConsumer>(1);
                }
                this.subscribers.add(subscriber);
                if (this.buffer != null) {
                    if (last) {
                        subscriber.add(this.buffer.slice());
                        this.buffer = null;
                    } else {
                        subscriber.add(this.buffer.retainedSlice());
                    }
                }
                if (this.error != null) {
                    subscriber.error(this.error);
                } else if (this.lengthSoFar > this.limits.maxBufferSize()) {
                    subscriber.error((Throwable)new BufferLengthExceededException(this.limits.maxBufferSize(), this.lengthSoFar));
                    specificUpstream.allowDiscard();
                }
                if (this.complete) {
                    subscriber.complete();
                }
            } else if (this.buffer != null && last) {
                this.buffer.release();
                this.buffer = null;
            }
            if (this.tracker != null) {
                if (last) {
                    this.tracker.close((Object)this);
                } else {
                    this.tracker.record();
                }
            }
            this.working = false;
        }

        ExecutionFlow<ByteBuf> subscribeFull(BufferConsumer.Upstream specificUpstream, boolean forceDelay) {
            DelayedExecutionFlow asyncFlow = DelayedExecutionFlow.create();
            if (!forceDelay && this.eventLoop.inEventLoop() && !this.adding) {
                return this.subscribeFull0((DelayedExecutionFlow<ByteBuf>)asyncFlow, specificUpstream, true);
            }
            this.eventLoop.execute(() -> {
                ExecutionFlow<ByteBuf> res = this.subscribeFull0((DelayedExecutionFlow<ByteBuf>)asyncFlow, specificUpstream, false);
                assert (res == asyncFlow);
            });
            return asyncFlow;
        }

        private ExecutionFlow<ByteBuf> subscribeFull0(DelayedExecutionFlow<ByteBuf> targetFlow, BufferConsumer.Upstream specificUpstream, boolean canReturnImmediate) {
            assert (!this.working);
            if (this.reserved <= 0) {
                throw new IllegalStateException("Need to reserve a spot first. This should not happen, StreamingNettyByteBody should guard against it");
            }
            ExecutionFlow ret = targetFlow;
            this.working = true;
            boolean last = --this.reserved == 0;
            Throwable error = this.error;
            if (error == null && this.lengthSoFar > this.limits.maxBufferSize()) {
                error = new BufferLengthExceededException(this.limits.maxBufferSize(), this.lengthSoFar);
                specificUpstream.allowDiscard();
            }
            if (error != null) {
                if (canReturnImmediate) {
                    ret = ExecutionFlow.error((Throwable)error);
                } else {
                    targetFlow.completeExceptionally(error);
                }
            } else if (this.complete) {
                ByteBuf buf;
                if (this.buffer == null) {
                    buf = Unpooled.EMPTY_BUFFER;
                } else if (last) {
                    buf = this.buffer;
                    this.buffer = null;
                } else {
                    buf = this.buffer.retainedSlice();
                }
                if (canReturnImmediate) {
                    ret = ExecutionFlow.just((Object)buf);
                } else {
                    targetFlow.complete((Object)buf);
                }
            } else {
                if (this.fullSubscribers == null) {
                    this.fullSubscribers = new ArrayList<DelayedExecutionFlow<ByteBuf>>(1);
                }
                this.fullSubscribers.add((DelayedExecutionFlow<ByteBuf>)targetFlow);
            }
            if (this.tracker != null) {
                if (last) {
                    this.tracker.close((Object)this);
                } else {
                    this.tracker.record();
                }
            }
            this.working = false;
            return ret;
        }

        @Override
        public void add(ByteBuf buf) {
            assert (!this.working);
            buf.touch();
            long newLength = this.lengthSoFar + (long)buf.readableBytes();
            long expectedLength = this.expectedLength;
            if (expectedLength != -1L && newLength > expectedLength) {
                throw new IllegalStateException("Received more bytes than specified by Content-Length");
            }
            this.lengthSoFar = newLength;
            if (this.complete || this.error != null) {
                buf.release();
                return;
            }
            this.adding = true;
            if (newLength > this.limits.maxBodySize()) {
                buf.release();
                this.error((Throwable)new ContentLengthExceededException(this.limits.maxBodySize(), newLength));
                this.rootUpstream.allowDiscard();
                this.adding = false;
                return;
            }
            this.working = true;
            if (this.subscribers != null) {
                for (BufferConsumer subscriber : this.subscribers) {
                    subscriber.add(buf.retainedSlice());
                }
            }
            if (this.reserved > 0 || this.fullSubscribers != null) {
                if (newLength > this.limits.maxBufferSize()) {
                    buf.release();
                    if (this.buffer != null) {
                        this.buffer.release();
                        this.buffer = null;
                    }
                    if (this.fullSubscribers != null) {
                        BufferLengthExceededException e = new BufferLengthExceededException(this.limits.maxBufferSize(), this.lengthSoFar);
                        for (DelayedExecutionFlow<ByteBuf> fullSubscriber : this.fullSubscribers) {
                            fullSubscriber.completeExceptionally((Throwable)e);
                        }
                    }
                } else {
                    if (this.buffer == null) {
                        this.buffer = buf.alloc().compositeBuffer();
                    }
                    this.buffer.addComponent(true, buf);
                }
            } else {
                buf.release();
            }
            this.adding = false;
            this.working = false;
        }

        @Override
        public void complete() {
            if (this.expectedLength > this.lengthSoFar) {
                throw new IllegalStateException("Received fewer bytes than specified by Content-Length");
            }
            this.complete = true;
            this.expectedLength = this.lengthSoFar;
            if (this.subscribers != null) {
                for (BufferConsumer subscriber : this.subscribers) {
                    subscriber.complete();
                }
            }
            if (this.fullSubscribers != null) {
                boolean release;
                ByteBuf buf;
                if (this.buffer == null) {
                    buf = Unpooled.EMPTY_BUFFER;
                    release = false;
                } else {
                    buf = this.buffer;
                    if (this.reserved > 0) {
                        release = false;
                    } else {
                        this.buffer = null;
                        release = true;
                    }
                }
                for (DelayedExecutionFlow<ByteBuf> fullSubscriber : this.fullSubscribers) {
                    fullSubscriber.complete((Object)buf.retainedSlice());
                }
                if (release) {
                    buf.release();
                }
            }
        }

        @Override
        public void error(Throwable e) {
            this.error = e;
            if (this.buffer != null) {
                this.buffer.release();
                this.buffer = null;
            }
            if (this.subscribers != null) {
                for (BufferConsumer bufferConsumer : this.subscribers) {
                    bufferConsumer.error(e);
                }
            }
            if (this.fullSubscribers != null) {
                for (DelayedExecutionFlow delayedExecutionFlow : this.fullSubscribers) {
                    delayedExecutionFlow.completeExceptionally(e);
                }
            }
        }
    }
}

