package com.azure.core.http.vertx.implementation;

import com.azure.core.http.HttpResponse;
import com.azure.core.util.ProgressReporter;
import io.netty.buffer.Unpooled;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpClientRequest;
import java.nio.ByteBuffer;
import java.util.Objects;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.publisher.Operators;
import reactor.util.context.Context;
import reactor.util.context.ContextView;

/* loaded from: input_file:com/azure/core/http/vertx/implementation/VertxRequestWriteSubscriber.class */
public final class VertxRequestWriteSubscriber implements Subscriber<ByteBuffer> {
    private final HttpClientRequest request;
    private final Promise<HttpResponse> promise;
    private final ProgressReporter progressReporter;
    private final ContextView contextView;
    private volatile Subscription subscription;
    private volatile State state = State.UNINITIALIZED;
    private volatile Throwable error;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/azure/core/http/vertx/implementation/VertxRequestWriteSubscriber$State.class */
    public enum State {
        UNINITIALIZED(0),
        WRITING(1),
        COMPLETE(2),
        ERROR(3);

        private final int code;

        State(int i) {
            this.code = i;
        }
    }

    public VertxRequestWriteSubscriber(HttpClientRequest httpClientRequest, Promise<HttpResponse> promise, ProgressReporter progressReporter, ContextView contextView) {
        this.request = httpClientRequest.exceptionHandler(this::onError).drainHandler(r3 -> {
            requestNext();
        });
        this.promise = promise;
        this.progressReporter = progressReporter;
        this.contextView = contextView;
    }

    public void onSubscribe(Subscription subscription) {
        if (Operators.validate(this.subscription, subscription)) {
            this.subscription = subscription;
            subscription.request(1L);
        }
    }

    public void onNext(ByteBuffer byteBuffer) {
        try {
            if (this.state == State.WRITING) {
                onErrorInternal(new IllegalStateException("Received onNext while processing another write operation."));
            } else {
                this.state = State.WRITING;
                write(byteBuffer);
            }
        } catch (Exception e) {
            onErrorInternal(e);
        }
    }

    private void write(ByteBuffer byteBuffer) {
        int remaining = byteBuffer.remaining();
        this.request.write(Buffer.buffer(Unpooled.wrappedBuffer(byteBuffer)), asyncResult -> {
            State state = this.state;
            if (state == State.WRITING) {
                this.state = State.UNINITIALIZED;
            }
            if (!asyncResult.succeeded()) {
                this.state = State.ERROR;
                resetRequest(asyncResult.cause());
                return;
            }
            if (this.progressReporter != null) {
                this.progressReporter.reportProgress(remaining);
            }
            if (state == State.WRITING) {
                if (this.request.writeQueueFull()) {
                    return;
                }
                requestNext();
            } else if (state == State.COMPLETE) {
                endRequest();
            } else if (state == State.ERROR) {
                resetRequest(this.error);
            }
        });
    }

    private void requestNext() {
        if (this.state == State.UNINITIALIZED) {
            this.subscription.request(1L);
        }
    }

    public void onError(Throwable th) {
        onErrorInternal(th);
    }

    private void onErrorInternal(Throwable th) {
        State state = this.state;
        if (state.code >= 2) {
            Operators.onErrorDropped(th, Context.of(this.contextView));
        }
        this.state = State.ERROR;
        if (state != State.WRITING) {
            resetRequest(th);
        } else {
            this.error = th;
        }
    }

    private void resetRequest(Throwable th) {
        this.subscription.cancel();
        this.promise.fail(th);
        this.request.reset(0L, th);
    }

    public void onComplete() {
        State state = this.state;
        if (state.code >= 2) {
            return;
        }
        this.state = State.COMPLETE;
        if (state != State.WRITING) {
            endRequest();
        }
    }

    private void endRequest() {
        Future end = this.request.end();
        Promise<HttpResponse> promise = this.promise;
        Objects.requireNonNull(promise);
        end.onFailure(promise::fail);
    }
}
