package ratpack.file.internal;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.CompletionHandler;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import org.reactivestreams.Subscriber;
import ratpack.exec.Blocking;
import ratpack.exec.Promise;
import ratpack.stream.TransformablePublisher;
import ratpack.stream.internal.ManagedSubscription;

/* loaded from: input_file:ratpack/file/internal/FileReadingPublisher.class */
public class FileReadingPublisher implements TransformablePublisher<ByteBuf> {
    private final Promise<? extends AsynchronousFileChannel> file;
    private final int bufferSize;
    private final ByteBufAllocator allocator;
    private final long start;
    private final long stop;

    public FileReadingPublisher(Promise<? extends AsynchronousFileChannel> promise, ByteBufAllocator byteBufAllocator, int i, long j, long j2) {
        this.file = promise;
        this.bufferSize = i;
        this.allocator = byteBufAllocator;
        this.start = j;
        this.stop = j2 < 1 ? Long.MAX_VALUE : j2;
        if (i < 0) {
            throw new IllegalArgumentException("bufferSize must be 0 or positive");
        }
        if (j < 0) {
            throw new IllegalArgumentException("start must be 0 or positive");
        }
    }

    public void subscribe(Subscriber<? super ByteBuf> subscriber) {
        Promise<? extends AsynchronousFileChannel> promise = this.file;
        Objects.requireNonNull(subscriber);
        promise.onError(subscriber::onError).then(asynchronousFileChannel -> {
            subscriber.onSubscribe(new ManagedSubscription<ByteBuf>(subscriber, (v0) -> {
                v0.release();
            }) { // from class: ratpack.file.internal.FileReadingPublisher.1
                private final AtomicBoolean reading = new AtomicBoolean();
                private long position;

                {
                    this.position = FileReadingPublisher.this.start;
                }

                protected void onRequest(long j) {
                    read();
                }

                protected void onCancel() {
                }

                private void read() {
                    if (this.reading.compareAndSet(false, true)) {
                        doRead();
                    }
                }

                private void doRead() {
                    AsynchronousFileChannel asynchronousFileChannel = asynchronousFileChannel;
                    Promise.async(downstream -> {
                        int min = (int) Math.min(FileReadingPublisher.this.stop - this.position, FileReadingPublisher.this.bufferSize);
                        ByteBuf buffer = FileReadingPublisher.this.allocator.buffer(min, min);
                        asynchronousFileChannel.read(buffer.nioBuffer(0, min), this.position, buffer, new CompletionHandler<Integer, ByteBuf>() { // from class: ratpack.file.internal.FileReadingPublisher.1.1
                            @Override // java.nio.channels.CompletionHandler
                            public void completed(Integer num, ByteBuf byteBuf) {
                                byteBuf.writerIndex(Math.max(num.intValue(), 0));
                                downstream.success(byteBuf);
                            }

                            @Override // java.nio.channels.CompletionHandler
                            public void failed(Throwable th, ByteBuf byteBuf) {
                                byteBuf.release();
                                downstream.error(th);
                            }
                        });
                    }).onError(this::complete).then(byteBuf -> {
                        if (byteBuf.readableBytes() == 0) {
                            byteBuf.release();
                            complete(null);
                            return;
                        }
                        this.position += byteBuf.readableBytes();
                        emitNext(byteBuf);
                        if (this.position == FileReadingPublisher.this.stop) {
                            complete(null);
                            return;
                        }
                        if (hasDemand()) {
                            doRead();
                            return;
                        }
                        this.reading.set(false);
                        if (hasDemand()) {
                            read();
                        }
                    });
                }

                private void complete(Throwable th) {
                    Promise ofNull = th == null ? Promise.ofNull() : Promise.error(th);
                    AsynchronousFileChannel asynchronousFileChannel = asynchronousFileChannel;
                    Objects.requireNonNull(asynchronousFileChannel);
                    ofNull.close(Blocking.op(asynchronousFileChannel::close)).onError(this::emitError).then(obj -> {
                        emitComplete();
                    });
                }
            });
        });
    }
}
