package org.asynchttpclient.request.body.generator;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.asynchttpclient.request.body.Body;
import org.asynchttpclient.request.body.generator.FeedableBodyGenerator;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/asynchttpclient/request/body/generator/ReactiveStreamsBodyGenerator.class */
public class ReactiveStreamsBodyGenerator implements FeedableBodyGenerator {
    private static final ByteBuffer EMPTY = ByteBuffer.wrap("".getBytes());
    private final Publisher<ByteBuffer> publisher;
    private final AtomicReference<FeedableBodyGenerator.FeedListener> feedListener = new AtomicReference<>(null);
    private final FeedableBodyGenerator feedableBodyGenerator = new SimpleFeedableBodyGenerator();

    /* loaded from: input_file:org/asynchttpclient/request/body/generator/ReactiveStreamsBodyGenerator$SimpleSubscriber.class */
    private class SimpleSubscriber implements Subscriber<ByteBuffer> {
        private final Logger LOGGER = LoggerFactory.getLogger(SimpleSubscriber.class);
        private final FeedableBodyGenerator feeder;
        private volatile Subscription subscription;

        public SimpleSubscriber(FeedableBodyGenerator feedableBodyGenerator) {
            this.feeder = feedableBodyGenerator;
        }

        public void onSubscribe(Subscription subscription) {
            if (subscription == null) {
                throw null;
            }
            if (this.subscription != null) {
                subscription.cancel();
            } else {
                this.subscription = subscription;
                this.subscription.request(Long.MAX_VALUE);
            }
        }

        public void onNext(ByteBuffer byteBuffer) {
            if (byteBuffer == null) {
                throw null;
            }
            try {
                this.feeder.feed(byteBuffer, false);
            } catch (Exception e) {
                this.LOGGER.error("Exception occurred while processing element in stream.", e);
                this.subscription.cancel();
            }
        }

        public void onError(Throwable th) {
            if (th == null) {
                throw null;
            }
            this.LOGGER.debug("Error occurred while consuming body stream.", th);
            FeedableBodyGenerator.FeedListener feedListener = (FeedableBodyGenerator.FeedListener) ReactiveStreamsBodyGenerator.this.feedListener.get();
            if (feedListener != null) {
                feedListener.onError(th);
            }
        }

        public void onComplete() {
            try {
                this.feeder.feed(ReactiveStreamsBodyGenerator.EMPTY, true);
            } catch (Exception e) {
                this.LOGGER.info("Ignoring exception occurred while completing stream processing.", e);
                this.subscription.cancel();
            }
        }
    }

    /* loaded from: input_file:org/asynchttpclient/request/body/generator/ReactiveStreamsBodyGenerator$StreamedBody.class */
    private class StreamedBody implements Body {
        private final AtomicBoolean initialized = new AtomicBoolean(false);
        private final SimpleSubscriber subscriber;
        private final Body body;

        public StreamedBody(Publisher<ByteBuffer> publisher, FeedableBodyGenerator feedableBodyGenerator) {
            this.body = feedableBodyGenerator.createBody();
            this.subscriber = new SimpleSubscriber(feedableBodyGenerator);
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
            this.body.close();
        }

        @Override // org.asynchttpclient.request.body.Body
        public long getContentLength() {
            return this.body.getContentLength();
        }

        @Override // org.asynchttpclient.request.body.Body
        public Body.State read(ByteBuffer byteBuffer) throws IOException {
            if (this.initialized.compareAndSet(false, true)) {
                ReactiveStreamsBodyGenerator.this.publisher.subscribe(this.subscriber);
            }
            return this.body.read(byteBuffer);
        }
    }

    public ReactiveStreamsBodyGenerator(Publisher<ByteBuffer> publisher) {
        this.publisher = publisher;
    }

    public Publisher<ByteBuffer> getPublisher() {
        return this.publisher;
    }

    @Override // org.asynchttpclient.request.body.generator.FeedableBodyGenerator
    public void feed(ByteBuffer byteBuffer, boolean z) {
        this.feedableBodyGenerator.feed(byteBuffer, z);
    }

    @Override // org.asynchttpclient.request.body.generator.FeedableBodyGenerator
    public void writeChunkBoundaries() {
        this.feedableBodyGenerator.writeChunkBoundaries();
    }

    @Override // org.asynchttpclient.request.body.generator.FeedableBodyGenerator
    public void setListener(FeedableBodyGenerator.FeedListener feedListener) {
        this.feedListener.set(feedListener);
        this.feedableBodyGenerator.setListener(feedListener);
    }

    @Override // org.asynchttpclient.request.body.generator.BodyGenerator
    public Body createBody() {
        return new StreamedBody(this.publisher, this.feedableBodyGenerator);
    }
}
