package ratpack.core.sse.internal;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

/* loaded from: input_file:ratpack/core/sse/internal/ServerSentEventStreamKeepAlive.class */
public class ServerSentEventStreamKeepAlive implements Publisher<ByteBuf> {
    private static final ByteBuf HEARTBEAT = Unpooled.unreleasableBuffer(Unpooled.wrappedBuffer(": keepalive heartbeat\n\n".getBytes(StandardCharsets.UTF_8)));
    private final Publisher<? extends ByteBuf> upstream;
    private final ScheduledExecutorService executor;
    private final Duration heartBeatFrequency;
    private final Clock clock;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: ratpack.core.sse.internal.ServerSentEventStreamKeepAlive$1, reason: invalid class name */
    /* loaded from: input_file:ratpack/core/sse/internal/ServerSentEventStreamKeepAlive$1.class */
    public class AnonymousClass1 implements Subscriber<ByteBuf> {
        private Subscription subscription;
        private ScheduledFuture<?> checkFuture;
        long lastWriteAt;
        long demand;
        long demandSurplusSent;
        boolean needsHeartbeat;
        final /* synthetic */ Subscriber val$downstream;
        final /* synthetic */ long val$heartbeatFrequencyNanos;

        AnonymousClass1(Subscriber subscriber, long j) {
            this.val$downstream = subscriber;
            this.val$heartbeatFrequencyNanos = j;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void stop() {
            if (this.checkFuture != null) {
                this.checkFuture.cancel(false);
                this.checkFuture = null;
            }
        }

        public void onSubscribe(Subscription subscription) {
            this.subscription = subscription;
            this.val$downstream.onSubscribe(new Subscription() { // from class: ratpack.core.sse.internal.ServerSentEventStreamKeepAlive.1.1
                private void scheduleCheck(long j) {
                    AnonymousClass1.this.checkFuture = ServerSentEventStreamKeepAlive.this.executor.schedule(this::check, j, TimeUnit.NANOSECONDS);
                }

                private void check() {
                    long nanoTime = ServerSentEventStreamKeepAlive.this.clock.nanoTime();
                    long j = AnonymousClass1.this.lastWriteAt + AnonymousClass1.this.val$heartbeatFrequencyNanos;
                    if (j > nanoTime) {
                        scheduleCheck(j - nanoTime);
                    } else if (AnonymousClass1.this.demand > AnonymousClass1.this.demandSurplusSent) {
                        emitHeartbeat();
                    } else {
                        AnonymousClass1.this.needsHeartbeat = true;
                    }
                }

                private void emitHeartbeat() {
                    AnonymousClass1.this.needsHeartbeat = false;
                    AnonymousClass1.this.demandSurplusSent++;
                    AnonymousClass1.this.lastWriteAt = ServerSentEventStreamKeepAlive.this.clock.nanoTime();
                    AnonymousClass1.this.val$downstream.onNext(ServerSentEventStreamKeepAlive.HEARTBEAT.slice());
                    scheduleCheck(AnonymousClass1.this.val$heartbeatFrequencyNanos);
                }

                public void request(long j) {
                    if (AnonymousClass1.this.checkFuture == null) {
                        AnonymousClass1.this.lastWriteAt = ServerSentEventStreamKeepAlive.this.clock.nanoTime();
                        scheduleCheck(AnonymousClass1.this.val$heartbeatFrequencyNanos);
                    }
                    long min = Math.min(j, AnonymousClass1.this.demandSurplusSent);
                    AnonymousClass1.this.demandSurplusSent -= min;
                    long j2 = j - min;
                    AnonymousClass1.this.demand += j2;
                    if (AnonymousClass1.this.needsHeartbeat && AnonymousClass1.this.demand > AnonymousClass1.this.demandSurplusSent) {
                        emitHeartbeat();
                    }
                    if (j2 > 0) {
                        AnonymousClass1.this.subscription.request(j2);
                    }
                }

                public void cancel() {
                    AnonymousClass1.this.stop();
                    AnonymousClass1.this.subscription.cancel();
                }
            });
        }

        public void onNext(ByteBuf byteBuf) {
            this.demand--;
            this.needsHeartbeat = false;
            this.lastWriteAt = ServerSentEventStreamKeepAlive.this.clock.nanoTime();
            this.val$downstream.onNext(byteBuf.touch());
        }

        public void onError(Throwable th) {
            stop();
            this.val$downstream.onError(th);
        }

        public void onComplete() {
            stop();
            this.val$downstream.onComplete();
        }
    }

    public ServerSentEventStreamKeepAlive(Publisher<? extends ByteBuf> publisher, ScheduledExecutorService scheduledExecutorService, Duration duration, Clock clock) {
        this.upstream = publisher;
        this.executor = scheduledExecutorService;
        this.heartBeatFrequency = duration;
        this.clock = clock;
    }

    public void subscribe(Subscriber<? super ByteBuf> subscriber) {
        this.upstream.subscribe(new AnonymousClass1(subscriber, this.heartBeatFrequency.toNanos()));
    }
}
