package ratpack.core.http.client.internal;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpObject;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.util.concurrent.Future;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import org.reactivestreams.Subscription;
import ratpack.core.http.Headers;
import ratpack.core.http.MutableHeaders;
import ratpack.core.http.Response;
import ratpack.core.http.Status;
import ratpack.core.http.client.RequestSpec;
import ratpack.core.http.client.StreamedResponse;
import ratpack.core.http.internal.DefaultStatus;
import ratpack.core.http.internal.NettyHeadersBackedHeaders;
import ratpack.exec.Downstream;
import ratpack.exec.Execution;
import ratpack.exec.Upstream;
import ratpack.exec.stream.TransformablePublisher;
import ratpack.exec.stream.internal.BufferedWriteStream;
import ratpack.exec.stream.internal.BufferingPublisher;
import ratpack.func.Action;
import ratpack.func.Exceptions;

/* loaded from: input_file:ratpack/core/http/client/internal/ContentStreamingRequestAction.class */
public class ContentStreamingRequestAction extends RequestActionSupport<StreamedResponse> {
    private static final String HANDLER_NAME = "streaming";

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:ratpack/core/http/client/internal/ContentStreamingRequestAction$Handler.class */
    public class Handler extends SimpleChannelInboundHandler<HttpObject> {
        private final ChannelPipeline channelPipeline;
        private final Downstream<? super StreamedResponse> downstream;
        private List<HttpContent> received;
        private BufferedWriteStream<ByteBuf> write;
        private HttpResponse response;

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:ratpack/core/http/client/internal/ContentStreamingRequestAction$Handler$DefaultStreamedResponse.class */
        public class DefaultStreamedResponse implements StreamedResponse {
            private final ChannelPipeline channelPipeline;
            private final Status status;
            private final Headers headers;

            private DefaultStreamedResponse(ChannelPipeline channelPipeline) {
                this.channelPipeline = channelPipeline;
                this.headers = new NettyHeadersBackedHeaders(Handler.this.response.headers());
                this.status = new DefaultStatus(Handler.this.response.status());
            }

            @Override // ratpack.core.http.client.HttpResponse
            public Status getStatus() {
                return this.status;
            }

            @Override // ratpack.core.http.client.HttpResponse
            public int getStatusCode() {
                return this.status.getCode();
            }

            @Override // ratpack.core.http.client.HttpResponse
            public Headers getHeaders() {
                return this.headers;
            }

            @Override // ratpack.core.http.client.StreamedResponse
            public TransformablePublisher<ByteBuf> getBody() {
                return new BufferingPublisher((v0) -> {
                    v0.release();
                }, bufferedWriteStream -> {
                    Handler.this.write = bufferedWriteStream;
                    if (Handler.this.received != null) {
                        for (HttpContent httpContent : Handler.this.received) {
                            if (httpContent.content().readableBytes() > 0) {
                                bufferedWriteStream.item(httpContent.content().touch("emitting to user code"));
                            } else {
                                httpContent.release();
                            }
                            if (httpContent instanceof LastHttpContent) {
                                ContentStreamingRequestAction.this.dispose(this.channelPipeline, Handler.this.response).addListener(future -> {
                                    bufferedWriteStream.complete();
                                });
                            }
                        }
                        Handler.this.received.clear();
                    }
                    return new Subscription() { // from class: ratpack.core.http.client.internal.ContentStreamingRequestAction.Handler.DefaultStreamedResponse.1
                        public void request(long j) {
                            DefaultStreamedResponse.this.channelPipeline.read();
                        }

                        public void cancel() {
                            ContentStreamingRequestAction.this.forceDispose(DefaultStreamedResponse.this.channelPipeline);
                        }
                    };
                });
            }

            @Override // ratpack.core.http.client.StreamedResponse
            public void forwardTo(Response response) {
                forwardTo(response, Action.noop());
            }

            @Override // ratpack.core.http.client.StreamedResponse
            public void forwardTo(Response response, Action<? super MutableHeaders> action) {
                MutableHeaders headers = response.getHeaders();
                headers.copy(this.headers);
                headers.remove(HttpHeaderNames.CONNECTION);
                Exceptions.uncheck(() -> {
                    action.execute(headers);
                });
                response.status(this.status);
                response.sendStream(getBody().bindExec((v0) -> {
                    v0.release();
                }));
            }
        }

        public Handler(ChannelPipeline channelPipeline, Downstream<? super StreamedResponse> downstream) {
            super(false);
            this.channelPipeline = channelPipeline;
            this.downstream = downstream;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public void channelRead0(ChannelHandlerContext channelHandlerContext, HttpObject httpObject) throws Exception {
            if (httpObject instanceof HttpResponse) {
                this.response = (HttpResponse) httpObject;
                int code = this.response.status().code();
                if ((code >= 100 && code < 200) || code == 204) {
                    this.response.headers().remove(HttpHeaderNames.CONTENT_LENGTH);
                }
                this.channelPipeline.channel().config().setAutoRead(false);
                ContentStreamingRequestAction.this.execution.onComplete(() -> {
                    if (this.write == null) {
                        ContentStreamingRequestAction.this.forceDispose(this.channelPipeline);
                    }
                    if (this.received != null) {
                        this.received.forEach((v0) -> {
                            v0.release();
                        });
                    }
                });
                this.downstream.success(new DefaultStreamedResponse(this.channelPipeline));
                return;
            }
            if (httpObject instanceof HttpContent) {
                HttpContent httpContent = ((HttpContent) httpObject).touch();
                boolean z = httpContent.content().readableBytes() > 0;
                boolean z2 = httpObject instanceof LastHttpContent;
                if (this.write == null) {
                    if (z || z2) {
                        if (this.received == null) {
                            this.received = new ArrayList();
                        }
                        this.received.add(httpContent.touch());
                    } else {
                        httpContent.release();
                    }
                    if (z2) {
                        ContentStreamingRequestAction.this.dispose(channelHandlerContext.pipeline(), this.response);
                        return;
                    }
                    return;
                }
                if (z) {
                    this.write.item(httpContent.content().touch("emitting to user code"));
                } else {
                    httpContent.release();
                }
                if (z2) {
                    ContentStreamingRequestAction.this.dispose(channelHandlerContext.pipeline(), this.response).addListener(future -> {
                        this.write.complete();
                    });
                } else if (this.write.getRequested() > 0) {
                    channelHandlerContext.read();
                }
            }
        }

        public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) throws Exception {
            Throwable decorateException = ContentStreamingRequestAction.this.decorateException(th);
            ContentStreamingRequestAction.this.forceDispose(channelHandlerContext.pipeline()).addListener(future -> {
                if (!future.isSuccess()) {
                    decorateException.addSuppressed(future.cause());
                }
                if (this.write == null) {
                    this.downstream.error(decorateException);
                } else {
                    this.write.error(decorateException);
                }
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ContentStreamingRequestAction(URI uri, HttpClientInternal httpClientInternal, int i, boolean z, Execution execution, Action<? super RequestSpec> action) throws Exception {
        super(uri, httpClientInternal, i, z, execution, action);
    }

    @Override // ratpack.core.http.client.internal.RequestActionSupport
    protected Future<Void> doDispose(ChannelPipeline channelPipeline, boolean z) {
        channelPipeline.remove(HANDLER_NAME);
        return super.doDispose(channelPipeline, z);
    }

    @Override // ratpack.core.http.client.internal.RequestActionSupport
    protected void addResponseHandlers(ChannelPipeline channelPipeline, Downstream<? super StreamedResponse> downstream) {
        channelPipeline.addLast(HANDLER_NAME, new Handler(channelPipeline, downstream));
    }

    @Override // ratpack.core.http.client.internal.RequestActionSupport
    protected Upstream<StreamedResponse> onRedirect(URI uri, int i, boolean z, Action<? super RequestSpec> action) throws Exception {
        return new ContentStreamingRequestAction(uri, this.client, i, z, this.execution, action);
    }

    @Override // ratpack.core.http.client.internal.RequestActionSupport
    public /* bridge */ /* synthetic */ void connect(Downstream<? super StreamedResponse> downstream) throws Exception {
        super.connect(downstream);
    }
}
