package ratpack.sse;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.channel.EventLoop;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.util.ReferenceCountUtil;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.Objects;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import ratpack.api.Nullable;
import ratpack.exec.Execution;
import ratpack.exec.internal.Continuation;
import ratpack.exec.internal.DefaultExecution;
import ratpack.func.Action;
import ratpack.handling.Context;
import ratpack.http.Response;
import ratpack.http.internal.HttpHeaderConstants;
import ratpack.render.Renderable;
import ratpack.server.ServerConfig;
import ratpack.sse.internal.DefaultEvent;
import ratpack.sse.internal.ServerSentEventEncoder;
import ratpack.sse.internal.ServerSentEventStreamBufferSettings;
import ratpack.stream.Streams;
import ratpack.stream.TransformablePublisher;
import ratpack.stream.bytebuf.internal.ByteBufBufferingSubscription;

/* loaded from: input_file:ratpack/sse/ServerSentEvents.class */
public class ServerSentEvents implements Renderable {
    private static final ByteBuf HEARTBEAT = Unpooled.unreleasableBuffer(Unpooled.wrappedBuffer(": keepalive heartbeat\n\n".getBytes(StandardCharsets.UTF_8)));
    private final Publisher<? extends ServerSentEvent> publisher;
    private final boolean noContentOnEmpty;

    @Nullable
    private final Duration heartbeatFrequency;

    @Nullable
    private final ServerSentEventStreamBufferSettings bufferSettings;

    /* loaded from: input_file:ratpack/sse/ServerSentEvents$BuilderImpl.class */
    private static class BuilderImpl implements ServerSentEventsBuilder {
        private boolean noContentOnEmpty;
        private ServerSentEventStreamBufferSettings bufferSettings;
        private Duration keepAliveHeartbeat;

        private BuilderImpl() {
        }

        @Override // ratpack.sse.ServerSentEventsBuilder
        public ServerSentEventsBuilder buffered(int i, Duration duration) {
            if (i < 1) {
                System.out.println("numBytes must be > 0");
            }
            if (duration.isNegative()) {
                throw new IllegalArgumentException("duration must be zero or positive");
            }
            this.bufferSettings = new ServerSentEventStreamBufferSettings(i, duration);
            return this;
        }

        @Override // ratpack.sse.ServerSentEventsBuilder
        public ServerSentEventsBuilder noContentOnEmpty() {
            this.noContentOnEmpty = true;
            return this;
        }

        @Override // ratpack.sse.ServerSentEventsBuilder
        public ServerSentEventsBuilder keepAlive(Duration duration) {
            if (duration.isNegative() || duration.isZero()) {
                throw new IllegalArgumentException("duration must be positive");
            }
            this.keepAliveHeartbeat = duration;
            return this;
        }

        @Override // ratpack.sse.ServerSentEventsBuilder
        public ServerSentEvents build(Publisher<? extends ServerSentEvent> publisher) {
            return new ServerSentEvents(publisher, this.noContentOnEmpty, this.keepAliveHeartbeat, this.bufferSettings);
        }
    }

    public static ServerSentEventsBuilder builder() {
        return new BuilderImpl();
    }

    @Deprecated
    public static <T> ServerSentEvents serverSentEvents(Publisher<T> publisher, Action<? super Event<T>> action) {
        return new ServerSentEvents(DefaultEvent.toEvents(publisher, action), false, null, null);
    }

    private ServerSentEvents(Publisher<? extends ServerSentEvent> publisher, boolean z, @Nullable Duration duration, @Nullable ServerSentEventStreamBufferSettings serverSentEventStreamBufferSettings) {
        this.publisher = publisher;
        this.noContentOnEmpty = z;
        this.heartbeatFrequency = duration;
        this.bufferSettings = serverSentEventStreamBufferSettings;
    }

    @Nullable
    @Deprecated
    public Publisher<? extends Event<?>> getPublisher() {
        return Streams.map(this.publisher, DefaultEvent::fromServerSentEvent);
    }

    @Override // ratpack.render.Renderable
    public void render(Context context) throws Exception {
        Response response = context.getResponse();
        response.getHeaders().add(HttpHeaderConstants.CACHE_CONTROL, HttpHeaderConstants.NO_CACHE_FULL);
        response.getHeaders().add(HttpHeaderConstants.PRAGMA, HttpHeaderConstants.NO_CACHE);
        if (this.noContentOnEmpty) {
            renderWithNoContentOnEmpty(context);
        } else {
            renderStream(context, this.publisher);
        }
    }

    private void renderWithNoContentOnEmpty(Context context) {
        DefaultExecution require = DefaultExecution.require();
        Objects.requireNonNull(context);
        require.delimit(context::error, continuation -> {
            Execution.fork().eventLoop(require.getEventLoop()).start(execution -> {
                this.publisher.subscribe(new Subscriber<ServerSentEvent>() { // from class: ratpack.sse.ServerSentEvents.1
                    private Subscription subscription;
                    private Subscriber subscriber;

                    public void onSubscribe(Subscription subscription) {
                        this.subscription = subscription;
                        this.subscription.request(1L);
                    }

                    public void onNext(ServerSentEvent serverSentEvent) {
                        if (this.subscriber != null) {
                            this.subscriber.onNext(serverSentEvent);
                            return;
                        }
                        TransformablePublisher publish = Streams.publish(Collections.singleton(serverSentEvent));
                        Publisher publisher = subscriber -> {
                            this.subscriber = subscriber;
                            subscriber.onSubscribe((Subscription) Objects.requireNonNull(this.subscription));
                        };
                        Continuation continuation = continuation;
                        Context context2 = context;
                        continuation.resume(() -> {
                            ServerSentEvents.this.renderStream(context2, Streams.concat(Arrays.asList(publish, publisher)));
                        });
                    }

                    public void onError(Throwable th) {
                        if (this.subscriber != null) {
                            this.subscriber.onError(th);
                            return;
                        }
                        Continuation continuation = continuation;
                        Context context2 = context;
                        continuation.resume(() -> {
                            context2.error(th);
                        });
                    }

                    public void onComplete() {
                        if (this.subscriber != null) {
                            this.subscriber.onComplete();
                            return;
                        }
                        Continuation continuation = continuation;
                        Context context2 = context;
                        continuation.resume(() -> {
                            ServerSentEvents.emptyStream(context2);
                        });
                    }
                });
            });
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void renderStream(Context context, Publisher<? extends ServerSentEvent> publisher) {
        Response response = context.getResponse();
        response.getHeaders().add(HttpHeaderConstants.CONTENT_TYPE, HttpHeaderConstants.TEXT_EVENT_STREAM_CHARSET_UTF_8);
        response.getHeaders().add(HttpHeaderConstants.TRANSFER_ENCODING, HttpHeaderConstants.CHUNKED);
        ByteBufAllocator alloc = context.getDirectChannelAccess().getChannel().alloc();
        EventLoop eventLoop = context.getDirectChannelAccess().getChannel().eventLoop();
        response.sendStream(subscriber -> {
            new ByteBufBufferingSubscription<ServerSentEvent>(publisher, (v0) -> {
                v0.close();
            }, subscriber, eventLoop, System::nanoTime, this.bufferSettings == null ? Duration.ZERO : this.bufferSettings.window, this.heartbeatFrequency == null ? Duration.ZERO : this.heartbeatFrequency, HEARTBEAT) { // from class: ratpack.sse.ServerSentEvents.2
                final int watermark;
                final int bufferSize;
                ByteBuf buffer;

                {
                    this.watermark = ServerSentEvents.this.bufferSettings == null ? 0 : ServerSentEvents.this.bufferSettings.bytes;
                    this.bufferSize = ServerSentEvents.this.bufferSettings == null ? ServerConfig.DEFAULT_MAX_INITIAL_LINE_LENGTH : ServerSentEvents.this.bufferSettings.bytes;
                }

                /* JADX INFO: Access modifiers changed from: protected */
                public void buffer(ServerSentEvent serverSentEvent) {
                    if (this.buffer == null) {
                        this.buffer = alloc.buffer(this.bufferSize);
                    }
                    try {
                        ServerSentEventEncoder.encodeTo(serverSentEvent, this.buffer);
                    } finally {
                        serverSentEvent.close();
                    }
                }

                protected boolean bufferIsFull() {
                    return this.buffer.readableBytes() >= this.watermark;
                }

                protected ByteBuf flush() {
                    ByteBuf byteBuf = this.buffer;
                    this.buffer = null;
                    return byteBuf;
                }

                protected void discard() {
                    ReferenceCountUtil.safeRelease(this.buffer);
                    this.buffer = null;
                }

                protected boolean isEmpty() {
                    return this.buffer == null;
                }
            }.connect();
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void emptyStream(Context context) {
        context.getResponse().status(HttpResponseStatus.NO_CONTENT.code()).send();
    }
}
