/*
 * Decompiled with CFR 0.152.
 */
package ratpack.file.internal;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.util.ReferenceCounted;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.atomic.AtomicBoolean;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import ratpack.exec.Blocking;
import ratpack.exec.Promise;
import ratpack.stream.TransformablePublisher;
import ratpack.stream.internal.ManagedSubscription;

public class FileReadingPublisher
implements TransformablePublisher<ByteBuf> {
    private final Promise<? extends AsynchronousFileChannel> file;
    private final int bufferSize;
    private final ByteBufAllocator allocator;
    private final long start;
    private final long stop;

    public FileReadingPublisher(Promise<? extends AsynchronousFileChannel> file, ByteBufAllocator allocator, int bufferSize, long start, long stop) {
        this.file = file;
        this.bufferSize = bufferSize;
        this.allocator = allocator;
        this.start = start;
        long l = this.stop = stop < 1L ? Long.MAX_VALUE : stop;
        if (bufferSize < 0) {
            throw new IllegalArgumentException("bufferSize must be 0 or positive");
        }
        if (start < 0L) {
            throw new IllegalArgumentException("start must be 0 or positive");
        }
    }

    public void subscribe(Subscriber<? super ByteBuf> s) {
        this.file.onError(arg_0 -> s.onError(arg_0)).then(channel -> s.onSubscribe((Subscription)new ManagedSubscription<ByteBuf>(s, ReferenceCounted::release){
            private final AtomicBoolean reading;
            private long position;
            {
                super(arg0, arg1);
                this.reading = new AtomicBoolean();
                this.position = FileReadingPublisher.this.start;
            }

            protected void onRequest(long n) {
                this.read();
            }

            protected void onCancel() {
            }

            private void read() {
                if (this.reading.compareAndSet(false, true)) {
                    this.doRead();
                }
            }

            private void doRead() {
                Promise.async(down -> {
                    int size = (int)Math.min(FileReadingPublisher.this.stop - this.position, (long)FileReadingPublisher.this.bufferSize);
                    ByteBuf buffer = FileReadingPublisher.this.allocator.buffer(size, size);
                    channel.read(buffer.nioBuffer(0, size), this.position, buffer, new CompletionHandler<Integer, ByteBuf>(){

                        @Override
                        public void completed(Integer result, ByteBuf attachment) {
                            attachment.writerIndex(Math.max(result, 0));
                            down.success((Object)attachment);
                        }

                        @Override
                        public void failed(Throwable exc, ByteBuf attachment) {
                            attachment.release();
                            down.error(exc);
                        }
                    });
                }).onError(this::complete).then(read -> {
                    if (read.readableBytes() == 0) {
                        read.release();
                        this.complete(null);
                    } else {
                        this.position += (long)read.readableBytes();
                        this.emitNext(read);
                        if (this.position == FileReadingPublisher.this.stop) {
                            this.complete(null);
                        } else if (this.hasDemand()) {
                            this.doRead();
                        } else {
                            this.reading.set(false);
                            if (this.hasDemand()) {
                                this.read();
                            }
                        }
                    }
                });
            }

            private void complete(Throwable error) {
                Promise p = error == null ? Promise.ofNull() : Promise.error((Throwable)error);
                p.close(Blocking.op(channel::close)).onError(arg_0 -> (this).emitError(arg_0)).then(v -> this.emitComplete());
            }
        }));
    }
}

