package ratpack.server.internal;

import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.handler.codec.http.DefaultHttpContent;
import io.netty.handler.codec.http.LastHttpContent;
import java.util.function.Consumer;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:ratpack/server/internal/StreamingResponseWriter.class */
public class StreamingResponseWriter implements ResponseWriter {
    private final Publisher<? extends ByteBuf> publisher;
    private boolean done;
    private Subscription subscription;

    public StreamingResponseWriter(Publisher<? extends ByteBuf> publisher) {
        this.publisher = publisher;
    }

    @Override // ratpack.server.internal.ResponseWriter
    public void write(final Channel channel, final Consumer<? super ResponseWritingListener> consumer, final Consumer<? super ChannelFuture> consumer2) {
        this.publisher.subscribe(new Subscriber<ByteBuf>() { // from class: ratpack.server.internal.StreamingResponseWriter.1
            public void onSubscribe(Subscription subscription) {
                if (subscription == null) {
                    throw new NullPointerException("'subscription' is null");
                }
                if (StreamingResponseWriter.this.subscription != null) {
                    subscription.cancel();
                    return;
                }
                StreamingResponseWriter.this.subscription = subscription;
                consumer.accept(new ResponseWritingListener() { // from class: ratpack.server.internal.StreamingResponseWriter.1.1
                    @Override // ratpack.server.internal.ResponseWritingListener
                    public void onClosed() {
                        if (StreamingResponseWriter.this.done) {
                            return;
                        }
                        StreamingResponseWriter.this.done = true;
                        StreamingResponseWriter.this.subscription.cancel();
                        consumer2.accept(channel.pipeline().newSucceededFuture());
                    }

                    @Override // ratpack.server.internal.ResponseWritingListener
                    public void onWritable() {
                        if (StreamingResponseWriter.this.done) {
                            return;
                        }
                        StreamingResponseWriter.this.subscription.request(1L);
                    }
                });
                if (channel.isWritable()) {
                    StreamingResponseWriter.this.subscription.request(1L);
                }
            }

            public void onNext(ByteBuf byteBuf) {
                byteBuf.touch();
                if (byteBuf.readableBytes() == 0) {
                    byteBuf.release();
                    StreamingResponseWriter.this.subscription.request(1L);
                } else {
                    if (StreamingResponseWriter.this.done) {
                        byteBuf.release();
                        return;
                    }
                    channel.writeAndFlush(new DefaultHttpContent(byteBuf.touch()), channel.voidPromise());
                    if (channel.isWritable()) {
                        StreamingResponseWriter.this.subscription.request(1L);
                    }
                }
            }

            public void onError(Throwable th) {
                if (th == null) {
                    throw new NullPointerException("error is null");
                }
                if (StreamingResponseWriter.this.done) {
                    return;
                }
                StreamingResponseWriter.this.done = true;
                consumer2.accept(channel.newFailedFuture(th));
            }

            public void onComplete() {
                if (StreamingResponseWriter.this.done) {
                    return;
                }
                StreamingResponseWriter.this.done = true;
                consumer2.accept(channel.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT));
            }
        });
    }
}
