package com.tencent.trpc.proto.standard.stream;

import com.google.common.collect.Maps;
import com.tencent.trpc.core.common.config.BackendConfig;
import com.tencent.trpc.core.common.config.ProtocolConfig;
import com.tencent.trpc.core.exception.TRpcException;
import com.tencent.trpc.core.logger.Logger;
import com.tencent.trpc.core.logger.LoggerFactory;
import com.tencent.trpc.core.rpc.RpcContext;
import com.tencent.trpc.core.rpc.RpcInvocation;
import com.tencent.trpc.core.stream.StreamCall;
import com.tencent.trpc.core.stream.transport.RpcConnection;
import com.tencent.trpc.core.utils.RpcContextUtils;
import com.tencent.trpc.core.worker.spi.WorkerPool;
import com.tencent.trpc.proto.standard.common.TRPCProtocol;
import com.tencent.trpc.proto.standard.stream.codec.TRpcStreamFrameCodec;
import io.netty.buffer.ByteBuf;
import io.netty.util.ReferenceCountUtil;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:com/tencent/trpc/proto/standard/stream/TRpcStreamRequester.class */
public class TRpcStreamRequester extends BaseTRpcStreamRequesterResponder implements StreamCall {
    private static final Logger logger = LoggerFactory.getLogger(TRpcStreamRequester.class);
    private final ConcurrentMap<Integer, Sinks.One<TRPCProtocol.TrpcStreamInitMeta>> streamSetups;
    private final AtomicInteger streamIdGenerator;
    private final BackendConfig backendConfig;
    private final WorkerPool workerPool;
    private final Scheduler scheduler;

    public TRpcStreamRequester(ProtocolConfig protocolConfig, RpcConnection rpcConnection, BackendConfig backendConfig) {
        super(protocolConfig, rpcConnection);
        this.streamSetups = Maps.newConcurrentMap();
        this.streamIdGenerator = new AtomicInteger(100);
        this.backendConfig = (BackendConfig) Objects.requireNonNull(backendConfig, "backendConfig is null");
        this.workerPool = backendConfig.getWorkerPoolObj();
        this.scheduler = Schedulers.fromExecutor(this.workerPool.toExecutor());
    }

    public <ReqT, RspT> Flux<RspT> serverStream(RpcContext rpcContext, ReqT reqt) {
        return duplexStream(rpcContext, Mono.just(reqt));
    }

    public <ReqT, RspT> Mono<RspT> clientStream(RpcContext rpcContext, Publisher<ReqT> publisher) {
        return duplexStream(rpcContext, publisher).next();
    }

    public <ReqT, RspT> Flux<RspT> duplexStream(RpcContext rpcContext, Publisher<ReqT> publisher) {
        RpcInvocation rpcInvocation = (RpcInvocation) RpcContextUtils.getValueMapValue(rpcContext, "ctx_rpc_invocation");
        Objects.requireNonNull(rpcInvocation, "invocation");
        int andIncrement = this.streamIdGenerator.getAndIncrement();
        int windowSize = getWindowSize(this.protocolConfig.getReceiveBuffer());
        Sinks.One<TRPCProtocol.TrpcStreamInitMeta> one = Sinks.one();
        this.streamSetups.put(Integer.valueOf(andIncrement), one);
        Sinks.Many<ByteBuf> onBackpressureBuffer = Sinks.many().unicast().onBackpressureBuffer();
        this.receivers.put(Integer.valueOf(andIncrement), onBackpressureBuffer);
        this.connection.send(TRpcStreamFrameCodec.encodeStreamInitRequestFrame(this.connection.alloc(), andIncrement, windowSize, TRpcStreamFrameCodec.buildRpcCallInfo(rpcContext), this.backendConfig));
        return one.asMono().timeout(getStreamBuildTimeout(rpcContext)).doOnError(th -> {
            if (th instanceof TRpcException) {
                TRpcException tRpcException = (TRpcException) th;
                errorResetStream(andIncrement, tRpcException.getCode(), tRpcException.getMessage());
            } else if (th instanceof TimeoutException) {
                errorResetStream(andIncrement, TRPCProtocol.TrpcRetCode.TRPC_STREAM_CLIENT_READ_TIMEOUT_ERR_VALUE, "read stream timeout");
            } else {
                errorResetStream(andIncrement, TRPCProtocol.TrpcRetCode.TRPC_STREAM_UNKNOWN_ERR_VALUE, "init stream failed");
            }
        }).doFinally(signalType -> {
            this.streamSetups.remove(Integer.valueOf(andIncrement));
        }).flatMapMany(trpcStreamInitMeta -> {
            TRpcStreamFrameCodec newDataFrameCodec = TRpcStreamFrameCodec.newDataFrameCodec(this.protocolConfig, this.connection.alloc(), trpcStreamInitMeta.getContentEncoding(), trpcStreamInitMeta.getContentType());
            StreamRemoteSubscriber<?> streamRemoteSubscriber = new StreamRemoteSubscriber<>(this.workerPool, this.connection, newDataFrameCodec, andIncrement, trpcStreamInitMeta.getInitWindowSize());
            this.subscribers.put(Integer.valueOf(andIncrement), streamRemoteSubscriber);
            Flux.from(publisher).subscribeOn(this.scheduler).doFinally(signalType2 -> {
                this.subscribers.remove(Integer.valueOf(andIncrement));
            }).subscribe(streamRemoteSubscriber);
            return onBackpressureBuffer.asFlux().publishOn(this.scheduler).doOnNext(new StreamLocalConsumer(this.connection, andIncrement, windowSize)).map(byteBuf -> {
                try {
                    Object decodeDataFrameData = newDataFrameCodec.decodeDataFrameData(byteBuf, (Class) rpcInvocation.getRpcMethodInfo().getActualReturnType());
                    ReferenceCountUtil.safeRelease(byteBuf);
                    return decodeDataFrameData;
                } catch (Throwable th2) {
                    ReferenceCountUtil.safeRelease(byteBuf);
                    throw th2;
                }
            });
        }).doFinally(signalType2 -> {
            this.receivers.remove(Integer.valueOf(andIncrement));
        });
    }

    private Duration getStreamBuildTimeout(RpcContext rpcContext) {
        long timeoutMills = rpcContext.toClientContext().getTimeoutMills();
        if (timeoutMills <= 0) {
            timeoutMills = 1000;
        }
        return Duration.ofMillis(timeoutMills);
    }

    @Override // com.tencent.trpc.proto.standard.stream.BaseTRpcStreamRequesterResponder
    protected void handleStreamInit(int i, ByteBuf byteBuf) {
        Sinks.One<TRPCProtocol.TrpcStreamInitMeta> one = this.streamSetups.get(Integer.valueOf(i));
        if (one == null) {
            errorResetStream(i, TRPCProtocol.TrpcRetCode.TRPC_STREAM_UNKNOWN_ERR_VALUE, "not found stream");
            return;
        }
        TRPCProtocol.TrpcStreamInitMeta decodeStreamInitFrame = TRpcStreamFrameCodec.decodeStreamInitFrame(byteBuf);
        Objects.requireNonNull(decodeStreamInitFrame, "the stream init frame is null");
        TRPCProtocol.TrpcStreamInitResponseMeta responseMeta = decodeStreamInitFrame.getResponseMeta();
        if (responseMeta.getRet() == 0) {
            one.emitValue(decodeStreamInitFrame, Sinks.EmitFailureHandler.FAIL_FAST);
        } else {
            logger.error("stream {} init failed: {}", new Object[]{Integer.valueOf(i), responseMeta});
            one.emitError(TRpcException.newFrameException(responseMeta.getRet(), responseMeta.getErrorMsg().toStringUtf8()), Sinks.EmitFailureHandler.FAIL_FAST);
        }
    }
}
