package ratpack.core.sse.internal;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import ratpack.exec.stream.bytebuf.internal.ByteBufBufferingSubscription;

/* loaded from: input_file:ratpack/core/sse/internal/ServerSentEventStreamBuffer.class */
public class ServerSentEventStreamBuffer implements Publisher<ByteBuf> {
    private final Publisher<? extends ByteBuf> upstream;
    private final ScheduledExecutorService executor;
    private final ServerSentEventStreamBufferSettings bufferSettings;
    private final Clock clock;
    private final ByteBufAllocator byteBufAllocator;

    public ServerSentEventStreamBuffer(Publisher<? extends ByteBuf> publisher, ScheduledExecutorService scheduledExecutorService, ByteBufAllocator byteBufAllocator, ServerSentEventStreamBufferSettings serverSentEventStreamBufferSettings, Clock clock) {
        this.upstream = publisher;
        this.executor = scheduledExecutorService;
        this.byteBufAllocator = byteBufAllocator;
        this.bufferSettings = serverSentEventStreamBufferSettings;
        this.clock = clock;
    }

    public void subscribe(Subscriber<? super ByteBuf> subscriber) {
        final long nanos = this.bufferSettings.window.toNanos();
        subscriber.onSubscribe(new ByteBufBufferingSubscription(this.upstream, subscriber, this.byteBufAllocator, this.bufferSettings.events, this.bufferSettings.bytes) { // from class: ratpack.core.sse.internal.ServerSentEventStreamBuffer.1
            ScheduledFuture<?> checkFuture;
            long lastFlushAt;
            boolean needsFlush;

            protected void onCancel() {
                shutdown();
                super.onCancel();
            }

            private void shutdown() {
                if (this.checkFuture != null) {
                    this.checkFuture.cancel(false);
                    this.checkFuture = null;
                }
            }

            public void emitError(Throwable th) {
                shutdown();
                super.emitError(th);
            }

            public void emitComplete() {
                shutdown();
                super.emitComplete();
            }

            protected void onConnected() {
                if (nanos > 0) {
                    long nanoTime = System.nanoTime();
                    this.lastFlushAt = nanoTime;
                    scheduleCheck(nanoTime);
                }
            }

            private void check() {
                long nanoTime = ServerSentEventStreamBuffer.this.clock.nanoTime();
                if (nanoTime - this.lastFlushAt >= nanos) {
                    if (isEmpty()) {
                        this.needsFlush = true;
                        return;
                    }
                    flush();
                }
                scheduleCheck(nanoTime);
            }

            private void scheduleCheck(long j) {
                this.checkFuture = ServerSentEventStreamBuffer.this.executor.schedule(this::check, j - (this.lastFlushAt + nanos), TimeUnit.NANOSECONDS);
            }

            protected boolean shouldFlush() {
                return (!isEmpty() && this.needsFlush) || super.shouldFlush();
            }

            protected void flush() {
                super.flush();
                this.lastFlushAt = ServerSentEventStreamBuffer.this.clock.nanoTime();
                if (this.needsFlush) {
                    scheduleCheck(this.lastFlushAt);
                    this.needsFlush = false;
                }
            }
        });
    }
}
