/*
 * Decompiled with CFR 0.152.
 */
package ratpack.core.sse.internal;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import ratpack.core.sse.internal.Clock;
import ratpack.core.sse.internal.ServerSentEventStreamBufferSettings;
import ratpack.exec.stream.bytebuf.internal.ByteBufBufferingSubscription;

public class ServerSentEventStreamBuffer
implements Publisher<ByteBuf> {
    private final Publisher<? extends ByteBuf> upstream;
    private final ScheduledExecutorService executor;
    private final ServerSentEventStreamBufferSettings bufferSettings;
    private final Clock clock;
    private final ByteBufAllocator byteBufAllocator;

    public ServerSentEventStreamBuffer(Publisher<? extends ByteBuf> upstream, ScheduledExecutorService executor, ByteBufAllocator byteBufAllocator, ServerSentEventStreamBufferSettings bufferSettings, Clock clock) {
        this.upstream = upstream;
        this.executor = executor;
        this.byteBufAllocator = byteBufAllocator;
        this.bufferSettings = bufferSettings;
        this.clock = clock;
    }

    public void subscribe(Subscriber<? super ByteBuf> s) {
        final long flushFrequencyNanos = this.bufferSettings.window.toNanos();
        s.onSubscribe((Subscription)new ByteBufBufferingSubscription(this.upstream, s, this.byteBufAllocator, this.bufferSettings.events, this.bufferSettings.bytes){
            ScheduledFuture<?> checkFuture;
            long lastFlushAt;
            boolean needsFlush;

            protected void onCancel() {
                this.shutdown();
                super.onCancel();
            }

            private void shutdown() {
                if (this.checkFuture != null) {
                    this.checkFuture.cancel(false);
                    this.checkFuture = null;
                }
            }

            public void emitError(Throwable error) {
                this.shutdown();
                super.emitError(error);
            }

            public void emitComplete() {
                this.shutdown();
                super.emitComplete();
            }

            protected void onConnected() {
                if (flushFrequencyNanos > 0L) {
                    long now;
                    this.lastFlushAt = now = System.nanoTime();
                    this.scheduleCheck(now);
                }
            }

            private void check() {
                boolean flushIsDue;
                long nowNanos = ServerSentEventStreamBuffer.this.clock.nanoTime();
                long sinceLastFlushNanos = nowNanos - this.lastFlushAt;
                boolean bl = flushIsDue = sinceLastFlushNanos >= flushFrequencyNanos;
                if (flushIsDue) {
                    if (this.isEmpty()) {
                        this.needsFlush = true;
                        return;
                    }
                    this.flush();
                }
                this.scheduleCheck(nowNanos);
            }

            private void scheduleCheck(long nowNanos) {
                long scheduleFor = nowNanos - (this.lastFlushAt + flushFrequencyNanos);
                this.checkFuture = ServerSentEventStreamBuffer.this.executor.schedule(this::check, scheduleFor, TimeUnit.NANOSECONDS);
            }

            protected boolean shouldFlush() {
                return !this.isEmpty() && this.needsFlush || super.shouldFlush();
            }

            protected void flush() {
                super.flush();
                this.lastFlushAt = ServerSentEventStreamBuffer.this.clock.nanoTime();
                if (this.needsFlush) {
                    this.scheduleCheck(this.lastFlushAt);
                    this.needsFlush = false;
                }
            }
        });
    }
}

