package ratpack.stream.internal;

import java.time.Duration;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.reactivestreams.Subscription;
import ratpack.func.Action;
import ratpack.func.Function;

/* loaded from: input_file:ratpack/stream/internal/PeriodicPublisher.class */
public class PeriodicPublisher<T> extends BufferingPublisher<T> {
    public PeriodicPublisher(ScheduledExecutorService scheduledExecutorService, Function<? super Integer, ? extends T> function, Duration duration) {
        super(Action.noop(), bufferedWriteStream -> {
            return new Subscription() { // from class: ratpack.stream.internal.PeriodicPublisher.1
                private final AtomicInteger counter = new AtomicInteger(0);
                private volatile ScheduledFuture future;

                public void request(long j) {
                    if (this.future == null) {
                        ScheduledExecutorService scheduledExecutorService2 = scheduledExecutorService;
                        Function function2 = function;
                        BufferedWriteStream bufferedWriteStream = bufferedWriteStream;
                        this.future = scheduledExecutorService2.scheduleWithFixedDelay(() -> {
                            try {
                                Object apply = function2.apply(Integer.valueOf(this.counter.getAndIncrement()));
                                if (apply != null) {
                                    bufferedWriteStream.item(apply);
                                } else {
                                    bufferedWriteStream.complete();
                                    cancel();
                                }
                            } catch (Exception e) {
                                bufferedWriteStream.error(e);
                                cancel();
                            }
                        }, 0L, duration.toNanos(), TimeUnit.NANOSECONDS);
                    }
                }

                public void cancel() {
                    if (this.future != null) {
                        this.future.cancel(false);
                    }
                }
            };
        });
    }
}
