/*
 * Decompiled with CFR 0.152.
 */
package io.micronaut.http.server.netty.handler;

import io.micronaut.core.annotation.Internal;
import io.micronaut.core.annotation.NonNull;
import io.micronaut.core.annotation.Nullable;
import io.micronaut.http.netty.EventLoopFlow;
import io.micronaut.http.netty.reactive.HotObservable;
import io.micronaut.http.server.netty.body.ByteBody;
import io.micronaut.http.server.netty.handler.BlockingWriter;
import io.micronaut.http.server.netty.handler.Compressor;
import io.micronaut.http.server.netty.handler.OutboundAccess;
import io.micronaut.http.server.netty.handler.RequestHandler;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoop;
import io.netty.handler.codec.http.DefaultHttpContent;
import io.netty.handler.codec.http.DefaultLastHttpContent;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http2.Http2Exception;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.concurrent.OrderedEventExecutor;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;
import reactor.util.concurrent.Queues;

@Internal
abstract class MultiplexedServerHandler {
    final Logger LOG = LoggerFactory.getLogger(this.getClass());
    ChannelHandlerContext ctx;
    private final RequestHandler requestHandler;
    @Nullable
    private Compressor compressor;

    MultiplexedServerHandler(RequestHandler requestHandler) {
        this.requestHandler = requestHandler;
    }

    final void compressor(@Nullable Compressor compressor) {
        this.compressor = compressor;
    }

    abstract void flush();

    private static final class SubscribedTooLateException
    extends RuntimeException {
        static final SubscribedTooLateException INSTANCE = new SubscribedTooLateException();

        SubscribedTooLateException() {
            super("Subscribed too late to message body (after closeIfNoSubscriber)");
        }

        @Override
        public Throwable fillInStackTrace() {
            return this;
        }
    }

    abstract class MultiplexedStream
    implements OutboundAccess {
        private HttpRequest request;
        private List<ByteBuf> bufferedContent;
        private InputStreamer streamer;
        private Object attachment;
        private boolean requestAccepted;
        private boolean responseDone;
        private BlockingWriter blockingWriter;
        private Compressor.Session compressionSession;

        MultiplexedStream() {
        }

        abstract void notifyDataConsumed(int var1);

        abstract boolean reset(Throwable var1);

        abstract void closeInput();

        final void onHeadersRead(HttpRequest headers, boolean endOfStream) {
            if (this.requestAccepted) {
                throw new IllegalStateException("Request already accepted");
            }
            this.request = headers;
            if (endOfStream) {
                this.requestAccepted = true;
                MultiplexedServerHandler.this.requestHandler.accept(MultiplexedServerHandler.this.ctx, headers, ByteBody.empty(), this);
            }
        }

        final int onDataRead(ByteBuf data, boolean endOfStream) {
            data.retain();
            if (this.streamer == null) {
                if (this.requestAccepted) {
                    throw new IllegalStateException("Request already accepted");
                }
                if (endOfStream) {
                    ByteBuf fullBody;
                    if (this.bufferedContent == null) {
                        fullBody = data;
                    } else {
                        CompositeByteBuf composite = MultiplexedServerHandler.this.ctx.alloc().compositeBuffer();
                        for (ByteBuf c : this.bufferedContent) {
                            composite.addComponent(true, c);
                        }
                        composite.addComponent(true, data);
                        fullBody = composite;
                    }
                    this.bufferedContent = null;
                    this.requestAccepted = true;
                    this.notifyDataConsumed(fullBody.readableBytes());
                    MultiplexedServerHandler.this.requestHandler.accept(MultiplexedServerHandler.this.ctx, this.request, ByteBody.of(fullBody), this);
                } else {
                    if (this.bufferedContent == null) {
                        this.bufferedContent = new ArrayList<ByteBuf>();
                    }
                    this.bufferedContent.add(data);
                }
            } else {
                DefaultLastHttpContent c;
                Object object = c = endOfStream ? new DefaultLastHttpContent(data) : new DefaultHttpContent(data);
                if (this.streamer.sink.tryEmitNext((Object)c) != Sinks.EmitResult.OK) {
                    c.release();
                }
                if (endOfStream) {
                    this.streamer.sink.tryEmitComplete();
                }
            }
            return 0;
        }

        final void devolveToStreaming() {
            if (this.requestAccepted || this.streamer != null || this.request == null) {
                return;
            }
            this.streamer = new InputStreamer();
            if (this.bufferedContent != null) {
                for (ByteBuf buf : this.bufferedContent) {
                    this.streamer.sink.tryEmitNext((Object)new DefaultHttpContent(buf));
                }
                this.bufferedContent = null;
            }
            this.requestAccepted = true;
            MultiplexedServerHandler.this.requestHandler.accept(MultiplexedServerHandler.this.ctx, this.request, ByteBody.of(this.streamer, this.request.headers().getInt((CharSequence)HttpHeaderNames.CONTENT_LENGTH, -1)), this);
        }

        final void onGoAwayRead(Exception e) {
            this.onRstStreamRead(e);
        }

        final void onRstStreamRead(Exception e) {
            if (this.streamer != null) {
                this.streamer.sink.tryEmitError((Throwable)e);
            }
            this.finish();
        }

        private boolean finish() {
            if (this.responseDone) {
                return false;
            }
            this.responseDone = true;
            if (this.blockingWriter != null) {
                this.blockingWriter.discard();
            }
            if (this.compressionSession != null) {
                this.compressionSession.discard();
            }
            MultiplexedServerHandler.this.requestHandler.responseWritten(this.attachment);
            return true;
        }

        @NonNull
        public final ByteBufAllocator alloc() {
            return MultiplexedServerHandler.this.ctx.alloc();
        }

        public final void writeFull(@NonNull FullHttpResponse response, boolean headResponse) {
            boolean empty;
            if (this.responseDone) {
                throw new IllegalStateException("Response already written");
            }
            if (!MultiplexedServerHandler.this.ctx.executor().inEventLoop()) {
                MultiplexedServerHandler.this.ctx.executor().execute(() -> this.writeFull(response, headResponse));
                return;
            }
            this.prepareCompression((HttpResponse)response, true);
            ByteBuf content = response.content();
            boolean bl = empty = !content.isReadable();
            if (this.compressionSession != null) {
                this.compressionSession.push(content);
                this.compressionSession.finish();
                this.compressionSession.fixContentLength((HttpResponse)response);
                content = this.compressionSession.poll();
                empty = content == null;
            }
            this.writeHeaders((HttpResponse)response, empty, MultiplexedServerHandler.this.ctx.voidPromise());
            if (!empty) {
                this.writeData0(content, true, MultiplexedServerHandler.this.ctx.voidPromise());
            }
            if (!this.finish()) {
                throw new IllegalStateException("Response already written");
            }
            MultiplexedServerHandler.this.flush();
        }

        public final void writeStreamed(@NonNull HttpResponse response, @NonNull Publisher<HttpContent> content) {
            if (this.responseDone) {
                throw new IllegalStateException("Response already written");
            }
            if (!MultiplexedServerHandler.this.ctx.executor().inEventLoop()) {
                MultiplexedServerHandler.this.ctx.executor().execute(() -> this.writeStreamed(response, content));
                return;
            }
            this.prepareCompression(response, false);
            this.writeHeaders(response, false, MultiplexedServerHandler.this.ctx.voidPromise());
            content.subscribe((Subscriber)new Subscriber<HttpContent>(){
                final EventLoopFlow flow;
                Subscription subscription;
                {
                    this.flow = new EventLoopFlow((OrderedEventExecutor)MultiplexedServerHandler.this.ctx.channel().eventLoop());
                }

                public void onSubscribe(Subscription s) {
                    this.subscription = s;
                    s.request(1L);
                }

                public void onNext(HttpContent httpContent) {
                    if (this.flow.executeNow(() -> this.onNext0(httpContent))) {
                        this.onNext0(httpContent);
                    }
                }

                private void onNext0(HttpContent content) {
                    MultiplexedStream.this.writeData(content.content(), false, MultiplexedServerHandler.this.ctx.newPromise().addListener((GenericFutureListener)((ChannelFutureListener)future -> {
                        if (future.isSuccess()) {
                            this.subscription.request(1L);
                        } else {
                            MultiplexedStream.this.logStreamWriteFailure(future.cause());
                            this.subscription.cancel();
                        }
                    })));
                    MultiplexedServerHandler.this.flush();
                }

                public void onError(Throwable t) {
                    if (this.flow.executeNow(() -> this.onError0(t))) {
                        this.onError0(t);
                    }
                }

                private void onError0(Throwable t) {
                    EventLoop eventLoop = MultiplexedServerHandler.this.ctx.channel().eventLoop();
                    if (!eventLoop.inEventLoop()) {
                        eventLoop.execute(() -> this.onError(t));
                        return;
                    }
                    if (!MultiplexedStream.this.reset(t)) {
                        MultiplexedServerHandler.this.LOG.warn("Reactive response received an error after some data has already been written. This error cannot be forwarded to the client.", t);
                    }
                    MultiplexedStream.this.finish();
                    MultiplexedServerHandler.this.flush();
                }

                public void onComplete() {
                    if (this.flow.executeNow(this::onComplete0)) {
                        this.onComplete0();
                    }
                }

                private void onComplete0() {
                    EventLoop eventLoop = MultiplexedServerHandler.this.ctx.channel().eventLoop();
                    if (!eventLoop.inEventLoop()) {
                        eventLoop.execute(this::onComplete);
                        return;
                    }
                    if (MultiplexedStream.this.finish()) {
                        MultiplexedStream.this.writeData(Unpooled.EMPTY_BUFFER, true, MultiplexedServerHandler.this.ctx.voidPromise());
                        MultiplexedServerHandler.this.flush();
                    }
                }
            });
        }

        private void logStreamWriteFailure(Throwable cause) {
            if (cause instanceof Http2Exception) {
                Http2Exception h2e = (Http2Exception)cause;
                if (MultiplexedServerHandler.this.LOG.isDebugEnabled()) {
                    MultiplexedServerHandler.this.LOG.debug("Stream shut down by client while sending data", (Throwable)h2e);
                }
            } else {
                MultiplexedServerHandler.this.LOG.debug("Stream shut down by client while sending data", cause);
            }
        }

        public final void writeStream(final @NonNull HttpResponse response, @NonNull InputStream stream, @NonNull ExecutorService executorService) {
            if (this.responseDone) {
                throw new IllegalStateException("Response already written");
            }
            if (!MultiplexedServerHandler.this.ctx.executor().inEventLoop()) {
                MultiplexedServerHandler.this.ctx.executor().execute(() -> this.writeStream(response, stream, executorService));
                return;
            }
            this.prepareCompression(response, false);
            this.blockingWriter = new BlockingWriter(MultiplexedServerHandler.this.ctx.alloc(), stream, executorService){
                int lastSuspend;
                {
                    super(alloc, stream, blockingExecutor);
                    this.lastSuspend = 0;
                }

                @Override
                protected void writeStart() {
                    MultiplexedStream.this.writeHeaders(response, false, MultiplexedServerHandler.this.ctx.voidPromise());
                }

                @Override
                protected boolean writeData(ByteBuf buf) {
                    ChannelPromise promise = MultiplexedServerHandler.this.ctx.newPromise();
                    MultiplexedStream.this.writeData(buf, false, promise);
                    MultiplexedServerHandler.this.flush();
                    if (promise.isSuccess()) {
                        return true;
                    }
                    int suspend = ++this.lastSuspend;
                    promise.addListener((GenericFutureListener)((ChannelFutureListener)future -> {
                        if (!MultiplexedServerHandler.this.ctx.executor().inEventLoop()) {
                            throw new IllegalStateException("Should complete in event loop");
                        }
                        if (future.isSuccess()) {
                            if (suspend == this.lastSuspend) {
                                this.writeSome();
                            }
                        } else {
                            MultiplexedStream.this.logStreamWriteFailure(future.cause());
                            this.discard();
                        }
                    }));
                    return false;
                }

                @Override
                protected void writeLast() {
                    MultiplexedStream.this.writeData(Unpooled.EMPTY_BUFFER, true, MultiplexedServerHandler.this.ctx.voidPromise());
                    MultiplexedServerHandler.this.flush();
                    MultiplexedStream.this.finish();
                }

                @Override
                protected void writeSomeAsync() {
                    MultiplexedServerHandler.this.ctx.executor().execute(this::writeSome);
                }
            };
            this.blockingWriter.writeSome();
        }

        @Override
        public final void attachment(Object attachment) {
            this.attachment = attachment;
        }

        @Override
        public final void closeAfterWrite() {
        }

        private void prepareCompression(HttpResponse headers, boolean full) {
            Compressor.Session session;
            if (MultiplexedServerHandler.this.compressor != null && (session = MultiplexedServerHandler.this.compressor.prepare(MultiplexedServerHandler.this.ctx, this.request, headers)) != null) {
                if (!full) {
                    headers.headers().remove((CharSequence)HttpHeaderNames.CONTENT_LENGTH);
                }
                this.compressionSession = session;
            }
        }

        abstract void writeHeaders(HttpResponse var1, boolean var2, ChannelPromise var3);

        private void writeData(ByteBuf data, boolean endStream, ChannelPromise promise) {
            if (this.compressionSession == null) {
                this.writeData0(data, endStream, promise);
            } else {
                this.writeDataCompressing(data, endStream, promise);
            }
        }

        private void writeDataCompressing(ByteBuf data, boolean endStream, ChannelPromise promise) {
            ByteBuf compressed;
            Compressor.Session compressionChannel = this.compressionSession;
            compressionChannel.push(data);
            if (endStream) {
                compressionChannel.finish();
            }
            if ((compressed = compressionChannel.poll()) == null) {
                if (endStream) {
                    this.writeData0(Unpooled.EMPTY_BUFFER, true, promise);
                } else {
                    promise.trySuccess();
                }
            } else {
                this.writeData0(compressed, endStream, promise);
            }
        }

        abstract void writeData0(ByteBuf var1, boolean var2, ChannelPromise var3);

        private class InputStreamer
        implements HotObservable<HttpContent> {
            final Queue<HttpContent> queue = (Queue)Queues.unbounded().get();
            final Sinks.Many<HttpContent> sink = Sinks.many().unicast().onBackpressureBuffer(this.queue);
            final Flux<HttpContent> flux = this.sink.asFlux().doOnNext(this::onNext);

            private InputStreamer() {
            }

            private void onNext(HttpContent c) {
                this.onNext(c.content().readableBytes());
            }

            private void onNext(int n) {
                EventLoop eventLoop = MultiplexedServerHandler.this.ctx.channel().eventLoop();
                if (!eventLoop.inEventLoop()) {
                    eventLoop.execute(() -> this.onNext(n));
                    return;
                }
                MultiplexedStream.this.notifyDataConsumed(n);
                if (this.queue.isEmpty()) {
                    MultiplexedServerHandler.this.flush();
                }
            }

            public void closeIfNoSubscriber() {
                EventLoop eventLoop = MultiplexedServerHandler.this.ctx.channel().eventLoop();
                if (!eventLoop.inEventLoop()) {
                    eventLoop.execute(this::closeIfNoSubscriber);
                    return;
                }
                if (this.sink.currentSubscriberCount() == 0) {
                    this.sink.tryEmitError((Throwable)SubscribedTooLateException.INSTANCE);
                    MultiplexedStream.this.closeInput();
                }
            }

            public void subscribe(Subscriber<? super HttpContent> s) {
                this.flux.subscribe(s);
            }
        }
    }
}

