package com.tencent.trpc.proto.standard.stream;

import com.google.common.collect.Maps;
import com.tencent.trpc.core.common.config.ProtocolConfig;
import com.tencent.trpc.core.logger.Logger;
import com.tencent.trpc.core.logger.LoggerFactory;
import com.tencent.trpc.core.rpc.InvokeMode;
import com.tencent.trpc.core.rpc.ProviderInvoker;
import com.tencent.trpc.core.rpc.RpcServerContext;
import com.tencent.trpc.core.rpc.common.RpcMethodInfo;
import com.tencent.trpc.core.rpc.common.RpcMethodInfoAndInvoker;
import com.tencent.trpc.core.rpc.def.DefMethodInfoRegister;
import com.tencent.trpc.core.stream.transport.RpcConnection;
import com.tencent.trpc.core.worker.spi.WorkerPool;
import com.tencent.trpc.proto.standard.common.TRPCProtocol;
import com.tencent.trpc.proto.standard.stream.codec.TRpcStreamFrameCodec;
import com.tencent.trpc.proto.standard.stream.config.TRpcStreamConstants;
import io.netty.buffer.ByteBuf;
import io.netty.util.ReferenceCountUtil;
import java.util.Objects;
import java.util.concurrent.ConcurrentMap;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:com/tencent/trpc/proto/standard/stream/TRpcStreamResponder.class */
public class TRpcStreamResponder extends BaseTRpcStreamRequesterResponder {
    private static final Logger logger = LoggerFactory.getLogger(TRpcStreamResponder.class);
    private final ConcurrentMap<String, StreamServiceInvoker> services;
    private final DefMethodInfoRegister methodInfoRegister;

    /* renamed from: com.tencent.trpc.proto.standard.stream.TRpcStreamResponder$1, reason: invalid class name */
    /* loaded from: input_file:com/tencent/trpc/proto/standard/stream/TRpcStreamResponder$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$com$tencent$trpc$core$rpc$InvokeMode = new int[InvokeMode.values().length];

        static {
            try {
                $SwitchMap$com$tencent$trpc$core$rpc$InvokeMode[InvokeMode.CLIENT_STREAM.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$tencent$trpc$core$rpc$InvokeMode[InvokeMode.SERVER_STREAM.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$com$tencent$trpc$core$rpc$InvokeMode[InvokeMode.DUPLEX_STREAM.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    public TRpcStreamResponder(ProtocolConfig protocolConfig, RpcConnection rpcConnection, DefMethodInfoRegister defMethodInfoRegister) {
        super(protocolConfig, rpcConnection);
        this.services = Maps.newConcurrentMap();
        this.methodInfoRegister = (DefMethodInfoRegister) Objects.requireNonNull(defMethodInfoRegister, "methodInfoRegister is null");
        logger.debug("created new rpcResponder for {}", new Object[]{protocolConfig});
    }

    @Override // com.tencent.trpc.proto.standard.stream.BaseTRpcStreamRequesterResponder
    protected void handleStreamInit(int i, ByteBuf byteBuf) {
        TRPCProtocol.TrpcStreamInitMeta decodeStreamInitFrame = TRpcStreamFrameCodec.decodeStreamInitFrame(byteBuf);
        TRPCProtocol.TrpcStreamInitRequestMeta requestMeta = decodeStreamInitFrame.getRequestMeta();
        String stringUtf8 = requestMeta.getFunc().toStringUtf8();
        RpcMethodInfoAndInvoker route = this.methodInfoRegister.route(stringUtf8);
        if (route == null) {
            errorInitStream(i, TRPCProtocol.TrpcRetCode.TRPC_INVOKE_UNKNOWN_ERR_VALUE, String.format("func %s not exist", stringUtf8));
            return;
        }
        ProviderInvoker invoker = route.getInvoker();
        RpcMethodInfo methodInfo = route.getMethodInfo();
        StreamServiceInvoker computeIfAbsent = this.services.computeIfAbsent(stringUtf8, str -> {
            return new StreamServiceInvoker(invoker.getImpl(), methodInfo.getMethod());
        });
        Sinks.Many<ByteBuf> onBackpressureBuffer = Sinks.many().unicast().onBackpressureBuffer();
        this.receivers.put(Integer.valueOf(i), onBackpressureBuffer);
        TRpcStreamFrameCodec newDataFrameCodec = TRpcStreamFrameCodec.newDataFrameCodec(this.protocolConfig, this.connection.alloc(), decodeStreamInitFrame.getContentEncoding(), decodeStreamInitFrame.getContentType());
        WorkerPool workerPoolObj = invoker.getConfig().getWorkerPoolObj();
        int windowSize = decodeStreamInitFrame.getInitWindowSize() == 0 ? 0 : getWindowSize(this.protocolConfig.getReceiveBuffer());
        Flux map = onBackpressureBuffer.asFlux().publishOn(Schedulers.fromExecutor(workerPoolObj.toExecutor())).doOnNext(new StreamLocalConsumer(this.connection, i, windowSize)).map(byteBuf2 -> {
            try {
                Object decodeDataFrameData = newDataFrameCodec.decodeDataFrameData(byteBuf2, (Class) methodInfo.getActualParamsTypes()[1]);
                ReferenceCountUtil.safeRelease(byteBuf2);
                return decodeDataFrameData;
            } catch (Throwable th) {
                ReferenceCountUtil.safeRelease(byteBuf2);
                throw th;
            }
        });
        workerPoolObj.execute(() -> {
            Mono duplexStream;
            RpcServerContext rpcServerContext = new RpcServerContext();
            requestMeta.getTransInfoMap().forEach((str2, byteString) -> {
                rpcServerContext.getReqAttachMap().put(str2, byteString.toByteArray());
            });
            switch (AnonymousClass1.$SwitchMap$com$tencent$trpc$core$rpc$InvokeMode[computeIfAbsent.invokeMode.ordinal()]) {
                case 1:
                    duplexStream = computeIfAbsent.clientStream(rpcServerContext, map);
                    break;
                case 2:
                    duplexStream = map.next().flatMapMany(obj -> {
                        return computeIfAbsent.serverStream(rpcServerContext, obj);
                    });
                    break;
                case 3:
                    duplexStream = computeIfAbsent.duplexStream(rpcServerContext, map);
                    break;
                default:
                    errorInitStream(i, TRPCProtocol.TrpcRetCode.TRPC_INVOKE_UNKNOWN_ERR_VALUE, String.format("stream method not support invoke mode %s", computeIfAbsent.invokeMode));
                    return;
            }
            this.connection.send(TRpcStreamFrameCodec.encodeStreamInitResponseFrame(this.connection.alloc(), i, windowSize, this.protocolConfig, 0, TRpcStreamConstants.RPC_DEFAULT_RET_CODE_OK));
            StreamRemoteSubscriber<?> streamRemoteSubscriber = new StreamRemoteSubscriber<>(workerPoolObj, this.connection, newDataFrameCodec, i, decodeStreamInitFrame.getInitWindowSize());
            this.subscribers.put(Integer.valueOf(i), streamRemoteSubscriber);
            Flux.from(duplexStream).doFinally(signalType -> {
                this.subscribers.remove(Integer.valueOf(i));
            }).subscribe(streamRemoteSubscriber);
        });
    }
}
