package com.couchbase.client.core.protostellar;

import com.couchbase.client.core.CoreProtostellar;
import com.couchbase.client.core.Timer;
import com.couchbase.client.core.api.kv.CoreAsyncResponse;
import com.couchbase.client.core.deps.io.grpc.stub.StreamObserver;
import com.couchbase.client.core.endpoint.ProtostellarEndpoint;
import com.couchbase.client.core.msg.CancellationReason;
import com.couchbase.client.core.retry.ProtostellarRequestBehaviour;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import java.util.function.Function;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;

/* loaded from: input_file:com/couchbase/client/core/protostellar/CoreProtostellarAccessorsStreaming.class */
public class CoreProtostellarAccessorsStreaming {
    private CoreProtostellarAccessorsStreaming() {
    }

    public static <TGrpcRequest, TGrpcResponse> List<TGrpcResponse> blocking(CoreProtostellar coreProtostellar, ProtostellarRequest<TGrpcRequest> protostellarRequest, BiConsumer<ProtostellarEndpoint, StreamObserver<TGrpcResponse>> biConsumer, Function<Throwable, ProtostellarRequestBehaviour> function) {
        return (List) async(coreProtostellar, protostellarRequest, biConsumer, function).toBlocking();
    }

    public static <TGrpcRequest, TGrpcResponse> CoreAsyncResponse<List<TGrpcResponse>> async(CoreProtostellar coreProtostellar, ProtostellarRequest<TGrpcRequest> protostellarRequest, BiConsumer<ProtostellarEndpoint, StreamObserver<TGrpcResponse>> biConsumer, Function<Throwable, ProtostellarRequestBehaviour> function) {
        CompletableFuture completableFuture = new CompletableFuture();
        CoreAsyncResponse<List<TGrpcResponse>> coreAsyncResponse = new CoreAsyncResponse<>(completableFuture, () -> {
        });
        asyncInternal(completableFuture, coreProtostellar, protostellarRequest, biConsumer, function);
        return coreAsyncResponse;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static <TGrpcRequest, TGrpcResponse> void asyncInternal(final CompletableFuture<List<TGrpcResponse>> completableFuture, final CoreProtostellar coreProtostellar, final ProtostellarRequest<TGrpcRequest> protostellarRequest, final BiConsumer<ProtostellarEndpoint, StreamObserver<TGrpcResponse>> biConsumer, final Function<Throwable, ProtostellarRequestBehaviour> function) {
        if (CoreProtostellarUtil.handleShutdownAsync(coreProtostellar, completableFuture, protostellarRequest)) {
            return;
        }
        ProtostellarEndpoint endpoint = coreProtostellar.endpoint();
        final ArrayList arrayList = new ArrayList();
        biConsumer.accept(endpoint, new StreamObserver<TGrpcResponse>() { // from class: com.couchbase.client.core.protostellar.CoreProtostellarAccessorsStreaming.1
            @Override // com.couchbase.client.core.deps.io.grpc.stub.StreamObserver
            public void onNext(TGrpcResponse tgrpcresponse) {
                arrayList.add(tgrpcresponse);
            }

            @Override // com.couchbase.client.core.deps.io.grpc.stub.StreamObserver
            public void onError(Throwable th) {
                ProtostellarRequestBehaviour protostellarRequestBehaviour = (ProtostellarRequestBehaviour) function.apply(th);
                if (protostellarRequestBehaviour.retryDuration() == null) {
                    if (protostellarRequest.completed()) {
                        return;
                    }
                    protostellarRequest.raisedResponseToUser(protostellarRequestBehaviour.exception());
                    completableFuture.completeExceptionally(protostellarRequestBehaviour.exception());
                    return;
                }
                Timer timer = coreProtostellar.context().environment().timer();
                CompletableFuture completableFuture2 = completableFuture;
                CoreProtostellar coreProtostellar2 = coreProtostellar;
                ProtostellarRequest protostellarRequest2 = protostellarRequest;
                BiConsumer biConsumer2 = biConsumer;
                Function function2 = function;
                if (timer.schedule(() -> {
                    CoreProtostellarAccessorsStreaming.asyncInternal(completableFuture2, coreProtostellar2, protostellarRequest2, biConsumer2, function2);
                }, protostellarRequestBehaviour.retryDuration(), true) == null) {
                    RuntimeException exception = protostellarRequest.cancel(CancellationReason.TOO_MANY_REQUESTS_IN_RETRY).exception();
                    if (protostellarRequest.completed()) {
                        return;
                    }
                    protostellarRequest.raisedResponseToUser(exception);
                    completableFuture.completeExceptionally(exception);
                }
            }

            @Override // com.couchbase.client.core.deps.io.grpc.stub.StreamObserver
            public void onCompleted() {
                completableFuture.complete(arrayList);
            }
        });
    }

    public static <TGrpcRequest, TGrpcResponse> Flux<TGrpcResponse> reactive(CoreProtostellar coreProtostellar, ProtostellarRequest<TGrpcRequest> protostellarRequest, BiConsumer<ProtostellarEndpoint, StreamObserver<TGrpcResponse>> biConsumer, Function<Throwable, ProtostellarRequestBehaviour> function) {
        return Flux.defer(() -> {
            Sinks.Many onBackpressureBuffer = Sinks.many().unicast().onBackpressureBuffer();
            reactiveInternal(onBackpressureBuffer, coreProtostellar, protostellarRequest, biConsumer, function);
            return onBackpressureBuffer.asFlux();
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static <TGrpcRequest, TGrpcResponse> void reactiveInternal(final Sinks.Many<TGrpcResponse> many, final CoreProtostellar coreProtostellar, final ProtostellarRequest<TGrpcRequest> protostellarRequest, final BiConsumer<ProtostellarEndpoint, StreamObserver<TGrpcResponse>> biConsumer, final Function<Throwable, ProtostellarRequestBehaviour> function) {
        if (CoreProtostellarUtil.handleShutdownReactive((Sinks.Many) many, coreProtostellar, (ProtostellarRequest<?>) protostellarRequest)) {
            return;
        }
        biConsumer.accept(coreProtostellar.endpoint(), new StreamObserver<TGrpcResponse>() { // from class: com.couchbase.client.core.protostellar.CoreProtostellarAccessorsStreaming.2
            @Override // com.couchbase.client.core.deps.io.grpc.stub.StreamObserver
            public void onNext(TGrpcResponse tgrpcresponse) {
                many.tryEmitNext(tgrpcresponse).orThrow();
            }

            @Override // com.couchbase.client.core.deps.io.grpc.stub.StreamObserver
            public void onError(Throwable th) {
                ProtostellarRequestBehaviour protostellarRequestBehaviour = (ProtostellarRequestBehaviour) function.apply(th);
                if (protostellarRequestBehaviour.retryDuration() == null) {
                    if (protostellarRequest.completed()) {
                        return;
                    }
                    protostellarRequest.raisedResponseToUser(protostellarRequestBehaviour.exception());
                    many.tryEmitError(protostellarRequestBehaviour.exception()).orThrow();
                    return;
                }
                Timer timer = coreProtostellar.context().environment().timer();
                Sinks.Many many2 = many;
                CoreProtostellar coreProtostellar2 = coreProtostellar;
                ProtostellarRequest protostellarRequest2 = protostellarRequest;
                BiConsumer biConsumer2 = biConsumer;
                Function function2 = function;
                if (timer.schedule(() -> {
                    CoreProtostellarAccessorsStreaming.reactiveInternal(many2, coreProtostellar2, protostellarRequest2, biConsumer2, function2);
                }, protostellarRequestBehaviour.retryDuration(), true) == null) {
                    RuntimeException exception = protostellarRequest.cancel(CancellationReason.TOO_MANY_REQUESTS_IN_RETRY).exception();
                    if (protostellarRequest.completed()) {
                        return;
                    }
                    protostellarRequest.raisedResponseToUser(exception);
                    many.tryEmitError(exception).orThrow();
                }
            }

            @Override // com.couchbase.client.core.deps.io.grpc.stub.StreamObserver
            public void onCompleted() {
                many.tryEmitComplete();
            }
        });
    }
}
