package org.apache.dubbo.reactive.calls;

import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.dubbo.common.stream.StreamObserver;
import org.apache.dubbo.reactive.ServerTripleReactorPublisher;
import org.apache.dubbo.reactive.ServerTripleReactorSubscriber;
import org.apache.dubbo.rpc.TriRpcStatus;
import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/apache/dubbo/reactive/calls/ReactorServerCalls.class */
public final class ReactorServerCalls {
    private ReactorServerCalls() {
    }

    public static <T, R> void oneToOne(T t, StreamObserver<R> streamObserver, Function<Mono<T>, Mono<R>> function) {
        try {
            function.apply(Mono.just(t)).subscribe(obj -> {
                streamObserver.onNext(obj);
                streamObserver.onCompleted();
            }, th -> {
                doOnResponseHasException(th, streamObserver);
            }, () -> {
                doOnResponseHasException(TriRpcStatus.NOT_FOUND.asException(), streamObserver);
            });
        } catch (Throwable th2) {
            doOnResponseHasException(th2, streamObserver);
        }
    }

    public static <T, R> CompletableFuture<List<R>> oneToMany(T t, StreamObserver<R> streamObserver, Function<Mono<T>, Flux<R>> function) {
        try {
            CallStreamObserver<T> callStreamObserver = (CallStreamObserver) streamObserver;
            Flux<R> apply = function.apply(Mono.just(t));
            ServerTripleReactorSubscriber serverTripleReactorSubscriber = new ServerTripleReactorSubscriber(callStreamObserver);
            ((ServerTripleReactorSubscriber) apply.subscribeWith(serverTripleReactorSubscriber)).subscribe(callStreamObserver);
            return serverTripleReactorSubscriber.getExecutionFuture();
        } catch (Throwable th) {
            doOnResponseHasException(th, streamObserver);
            CompletableFuture<List<R>> completableFuture = new CompletableFuture<>();
            completableFuture.completeExceptionally(th);
            return completableFuture;
        }
    }

    public static <T, R> StreamObserver<T> manyToOne(StreamObserver<R> streamObserver, Function<Flux<T>, Mono<R>> function) {
        ServerTripleReactorPublisher serverTripleReactorPublisher = new ServerTripleReactorPublisher((CallStreamObserver) streamObserver);
        try {
            Mono<R> apply = function.apply(Flux.from(serverTripleReactorPublisher));
            Consumer consumer = obj -> {
                if (serverTripleReactorPublisher.isCancelled()) {
                    return;
                }
                streamObserver.onNext(obj);
            };
            Consumer consumer2 = th -> {
                if (serverTripleReactorPublisher.isCancelled()) {
                    return;
                }
                streamObserver.onError(th);
            };
            Objects.requireNonNull(streamObserver);
            apply.subscribe(consumer, consumer2, streamObserver::onCompleted);
            serverTripleReactorPublisher.startRequest();
        } catch (Throwable th2) {
            streamObserver.onError(th2);
        }
        return serverTripleReactorPublisher;
    }

    public static <T, R> StreamObserver<T> manyToMany(StreamObserver<R> streamObserver, Function<Flux<T>, Flux<R>> function) {
        ServerTripleReactorPublisher serverTripleReactorPublisher = new ServerTripleReactorPublisher((CallStreamObserver) streamObserver);
        try {
            ((ServerTripleReactorSubscriber) function.apply(Flux.from(serverTripleReactorPublisher)).subscribeWith(new ServerTripleReactorSubscriber())).subscribe((CallStreamObserver) streamObserver);
            serverTripleReactorPublisher.startRequest();
        } catch (Throwable th) {
            streamObserver.onError(th);
        }
        return serverTripleReactorPublisher;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void doOnResponseHasException(Throwable th, StreamObserver<?> streamObserver) {
        streamObserver.onError(TriRpcStatus.getStatus(th).asException());
    }
}
