package com.tencent.trpc.proto.standard.stream.client;

import com.tencent.trpc.core.common.config.BackendConfig;
import com.tencent.trpc.core.common.config.ConsumerConfig;
import com.tencent.trpc.core.common.config.ProtocolConfig;
import com.tencent.trpc.core.exception.TRpcException;
import com.tencent.trpc.core.logger.Logger;
import com.tencent.trpc.core.logger.LoggerFactory;
import com.tencent.trpc.core.rpc.ConsumerInvoker;
import com.tencent.trpc.core.rpc.InvokeMode;
import com.tencent.trpc.core.rpc.Request;
import com.tencent.trpc.core.rpc.Response;
import com.tencent.trpc.core.rpc.RpcContext;
import com.tencent.trpc.core.rpc.RpcInvocation;
import com.tencent.trpc.core.rpc.def.DefResponse;
import com.tencent.trpc.core.stream.transport.ClientTransport;
import com.tencent.trpc.core.stream.transport.RpcConnection;
import com.tencent.trpc.core.utils.RpcContextUtils;
import com.tencent.trpc.proto.standard.common.TRPCProtocol;
import com.tencent.trpc.proto.standard.stream.TRpcStreamRequester;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;

/* loaded from: input_file:com/tencent/trpc/proto/standard/stream/client/StreamConsumerInvoker.class */
public class StreamConsumerInvoker<T> implements ConsumerInvoker<T> {
    private static final Logger logger = LoggerFactory.getLogger(StreamConsumerInvoker.class);
    private static final int STREAM_CLOSE_COUNT_BOTH_DIRECTION = 2;
    private final ConsumerConfig<T> consumerConfig;
    private final BackendConfig backendConfig;
    private final ProtocolConfig protocolConfig;
    private final ClientTransport clientTransport;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: com.tencent.trpc.proto.standard.stream.client.StreamConsumerInvoker$1, reason: invalid class name */
    /* loaded from: input_file:com/tencent/trpc/proto/standard/stream/client/StreamConsumerInvoker$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$com$tencent$trpc$core$rpc$InvokeMode = new int[InvokeMode.values().length];

        static {
            try {
                $SwitchMap$com$tencent$trpc$core$rpc$InvokeMode[InvokeMode.SERVER_STREAM.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$tencent$trpc$core$rpc$InvokeMode[InvokeMode.CLIENT_STREAM.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$com$tencent$trpc$core$rpc$InvokeMode[InvokeMode.DUPLEX_STREAM.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    public StreamConsumerInvoker(ConsumerConfig<T> consumerConfig, ProtocolConfig protocolConfig, ClientTransport clientTransport) {
        this.consumerConfig = (ConsumerConfig) Objects.requireNonNull(consumerConfig, "consumerConfig is null");
        this.backendConfig = (BackendConfig) Objects.requireNonNull(consumerConfig.getBackendConfig(), "backendConfig is null");
        this.protocolConfig = (ProtocolConfig) Objects.requireNonNull(protocolConfig, "protocolConfig is null");
        this.clientTransport = (ClientTransport) Objects.requireNonNull(clientTransport, "clientTransport is null");
    }

    public ConsumerConfig<T> getConfig() {
        return this.consumerConfig;
    }

    public ProtocolConfig getProtocolConfig() {
        return this.protocolConfig;
    }

    public Class<T> getInterface() {
        return this.backendConfig.getServiceInterface();
    }

    public CompletionStage<Response> invoke(Request request) {
        CompletableFuture completableFuture = new CompletableFuture();
        this.clientTransport.connect().subscribe(rpcConnection -> {
            try {
                TRpcStreamRequester tRpcStreamRequester = new TRpcStreamRequester(this.protocolConfig, rpcConnection, this.consumerConfig.getBackendConfig());
                DefResponse defResponse = new DefResponse();
                defResponse.setValue(doInvoke(request, tRpcStreamRequester, rpcConnection));
                completableFuture.complete(defResponse);
            } catch (Throwable th) {
                rpcConnection.dispose();
                completableFuture.completeExceptionally(TRpcException.newFrameException(TRPCProtocol.TrpcRetCode.TRPC_CLIENT_NETWORK_ERR_VALUE, "do invoke failed", th));
            }
        }, th -> {
            completableFuture.completeExceptionally(TRpcException.newFrameException(TRPCProtocol.TrpcRetCode.TRPC_CLIENT_CONNECT_ERR_VALUE, "do connect failed", th));
        });
        return completableFuture;
    }

    private Object doInvoke(Request request, TRpcStreamRequester tRpcStreamRequester, RpcConnection rpcConnection) {
        RpcInvocation invocation = request.getInvocation();
        InvokeMode invokeMode = invocation.getInvokeMode();
        RpcContext context = request.getContext();
        RpcContextUtils.putValueMapValue(context, "ctx_rpc_call_info", request.getMeta().getCallInfo());
        RpcContextUtils.putValueMapValue(context, "ctx_rpc_invocation", invocation);
        AtomicInteger atomicInteger = new AtomicInteger(2);
        Consumer consumer = signalType -> {
            if (atomicInteger.decrementAndGet() == 0) {
                logger.debug("close stream invoker connection: {}", new Object[]{rpcConnection});
                rpcConnection.dispose();
            }
        };
        Object firstArgument = invocation.getFirstArgument();
        switch (AnonymousClass1.$SwitchMap$com$tencent$trpc$core$rpc$InvokeMode[invokeMode.ordinal()]) {
            case 1:
                atomicInteger.decrementAndGet();
                return tRpcStreamRequester.serverStream(context, firstArgument).doFinally(consumer);
            case 2:
                return tRpcStreamRequester.clientStream(context, Flux.from((Publisher) firstArgument).doFinally(consumer)).doFinally(consumer);
            case 3:
                return tRpcStreamRequester.duplexStream(context, Flux.from((Publisher) firstArgument).doFinally(consumer)).doFinally(consumer);
            default:
                throw new UnsupportedOperationException("unknown method type " + invokeMode);
        }
    }
}
