package org.springframework.data.mongodb.gridfs;

import com.mongodb.reactivestreams.client.gridfs.AsyncInputStream;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import org.reactivestreams.Subscription;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DataBufferUtils;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;
import reactor.util.context.Context;

/* loaded from: input_file:org/springframework/data/mongodb/gridfs/DataBufferPublisherAdapter.class */
class DataBufferPublisherAdapter {

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/springframework/data/mongodb/gridfs/DataBufferPublisherAdapter$State.class */
    public static class State {
        private static final AtomicLongFieldUpdater<State> DEMAND = AtomicLongFieldUpdater.newUpdater(State.class, "demand");
        private static final AtomicIntegerFieldUpdater<State> STATE = AtomicIntegerFieldUpdater.newUpdater(State.class, "state");
        private static final AtomicIntegerFieldUpdater<State> READ = AtomicIntegerFieldUpdater.newUpdater(State.class, "read");
        private static final int STATE_OPEN = 0;
        private static final int STATE_CLOSED = 1;
        private static final int READ_NONE = 0;
        private static final int READ_IN_PROGRESS = 1;
        final AsyncInputStream inputStream;
        final DataBufferFactory dataBufferFactory;
        volatile long demand;
        volatile int state = 0;
        volatile int read = 0;

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/springframework/data/mongodb/gridfs/DataBufferPublisherAdapter$State$BufferCoreSubscriber.class */
        public class BufferCoreSubscriber implements CoreSubscriber<Integer> {
            private final FluxSink<DataBuffer> sink;
            private final DataBuffer dataBuffer;
            private final ByteBuffer intermediate;

            BufferCoreSubscriber(FluxSink<DataBuffer> fluxSink, DataBuffer dataBuffer, ByteBuffer byteBuffer) {
                this.sink = fluxSink;
                this.dataBuffer = dataBuffer;
                this.intermediate = byteBuffer;
            }

            public Context currentContext() {
                return this.sink.currentContext();
            }

            public void onSubscribe(Subscription subscription) {
                subscription.request(1L);
            }

            public void onNext(Integer num) {
                if (State.this.isClosed()) {
                    State.this.onReadDone();
                    DataBufferUtils.release(this.dataBuffer);
                    Operators.onNextDropped(this.dataBuffer, this.sink.currentContext());
                    return;
                }
                this.intermediate.flip();
                this.dataBuffer.write(new ByteBuffer[]{this.intermediate});
                this.sink.next(this.dataBuffer);
                State.this.decrementDemand();
                try {
                    if (num.intValue() == -1) {
                        this.sink.complete();
                    }
                } finally {
                    State.this.onReadDone();
                }
            }

            public void onError(Throwable th) {
                if (State.this.isClosed()) {
                    Operators.onErrorDropped(th, this.sink.currentContext());
                    return;
                }
                State.this.onReadDone();
                DataBufferUtils.release(this.dataBuffer);
                Operators.onNextDropped(this.dataBuffer, this.sink.currentContext());
                this.sink.error(th);
            }

            public void onComplete() {
                if (State.this.onShouldRead()) {
                    State.this.emitNext(this.sink);
                }
            }
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public void request(FluxSink<DataBuffer> fluxSink, long j) {
            Operators.addCap(DEMAND, this, j);
            if (onShouldRead()) {
                emitNext(fluxSink);
            }
        }

        boolean onShouldRead() {
            return !isClosed() && getDemand() > 0 && onWantRead();
        }

        boolean onWantRead() {
            return READ.compareAndSet(this, 0, 1);
        }

        boolean onReadDone() {
            return READ.compareAndSet(this, 1, 0);
        }

        long getDemand() {
            return DEMAND.get(this);
        }

        boolean decrementDemand() {
            return DEMAND.decrementAndGet(this) > 0;
        }

        void close() {
            STATE.compareAndSet(this, 0, 1);
        }

        boolean isClosed() {
            return STATE.get(this) == 1;
        }

        void emitNext(FluxSink<DataBuffer> fluxSink) {
            DataBuffer allocateBuffer = this.dataBufferFactory.allocateBuffer();
            ByteBuffer allocate = ByteBuffer.allocate(allocateBuffer.capacity());
            try {
                Mono.from(this.inputStream.read(allocate)).subscribe(new BufferCoreSubscriber(fluxSink, allocateBuffer, allocate));
            } catch (Exception e) {
                fluxSink.error(e);
            }
        }

        public State(AsyncInputStream asyncInputStream, DataBufferFactory dataBufferFactory) {
            this.inputStream = asyncInputStream;
            this.dataBufferFactory = dataBufferFactory;
        }
    }

    DataBufferPublisherAdapter() {
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static Flux<DataBuffer> createBinaryStream(AsyncInputStream asyncInputStream, DataBufferFactory dataBufferFactory) {
        State state = new State(asyncInputStream, dataBufferFactory);
        return Flux.usingWhen(Mono.just(asyncInputStream), asyncInputStream2 -> {
            return Flux.create(fluxSink -> {
                state.getClass();
                fluxSink.onDispose(state::close);
                state.getClass();
                fluxSink.onCancel(state::close);
                fluxSink.onRequest(j -> {
                    state.request(fluxSink, j);
                });
            });
        }, (v0) -> {
            return v0.close();
        }, (v0) -> {
            return v0.close();
        }, (v0) -> {
            return v0.close();
        }).concatMap((v0) -> {
            return Flux.just(v0);
        }, 1);
    }
}
