/*
 * Decompiled with CFR 0.152.
 */
package ratpack.server.internal;

import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.handler.codec.http.DefaultHttpContent;
import io.netty.handler.codec.http.LastHttpContent;
import java.util.function.Consumer;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import ratpack.server.internal.ResponseWriter;
import ratpack.server.internal.ResponseWritingListener;

class StreamingResponseWriter
implements ResponseWriter {
    private final Publisher<? extends ByteBuf> publisher;
    private boolean done;
    private Subscription subscription;

    public StreamingResponseWriter(Publisher<? extends ByteBuf> publisher) {
        this.publisher = publisher;
    }

    @Override
    public void write(final Channel channel, final Consumer<? super ResponseWritingListener> listenerReceiver, final Consumer<? super ChannelFuture> then) {
        this.publisher.subscribe((Subscriber)new Subscriber<ByteBuf>(){

            public void onSubscribe(Subscription incomingSubscription) {
                if (incomingSubscription == null) {
                    throw new NullPointerException("'subscription' is null");
                }
                if (StreamingResponseWriter.this.subscription != null) {
                    incomingSubscription.cancel();
                    return;
                }
                StreamingResponseWriter.this.subscription = incomingSubscription;
                listenerReceiver.accept(new ResponseWritingListener(){

                    @Override
                    public void onClosed() {
                        if (!StreamingResponseWriter.this.done) {
                            StreamingResponseWriter.this.done = true;
                            StreamingResponseWriter.this.subscription.cancel();
                            then.accept(channel.pipeline().newSucceededFuture());
                        }
                    }

                    @Override
                    public void onWritable() {
                        if (!StreamingResponseWriter.this.done) {
                            StreamingResponseWriter.this.subscription.request(1L);
                        }
                    }
                });
                if (channel.isWritable()) {
                    StreamingResponseWriter.this.subscription.request(1L);
                }
            }

            public void onNext(ByteBuf o) {
                o.touch();
                if (o.readableBytes() == 0) {
                    o.release();
                    StreamingResponseWriter.this.subscription.request(1L);
                    return;
                }
                if (StreamingResponseWriter.this.done) {
                    o.release();
                    return;
                }
                channel.writeAndFlush((Object)new DefaultHttpContent(o.touch()), channel.voidPromise());
                if (channel.isWritable()) {
                    StreamingResponseWriter.this.subscription.request(1L);
                }
            }

            public void onError(Throwable t) {
                if (t == null) {
                    throw new NullPointerException("error is null");
                }
                if (!StreamingResponseWriter.this.done) {
                    StreamingResponseWriter.this.done = true;
                    then.accept(channel.newFailedFuture(t));
                }
            }

            public void onComplete() {
                if (!StreamingResponseWriter.this.done) {
                    StreamingResponseWriter.this.done = true;
                    then.accept(channel.writeAndFlush((Object)LastHttpContent.EMPTY_LAST_CONTENT));
                }
            }
        });
    }
}

