package com.tencent.trpc.proto.standard.stream.server;

import com.tencent.trpc.core.common.config.ProtocolConfig;
import com.tencent.trpc.core.common.config.ProviderConfig;
import com.tencent.trpc.core.exception.TRpcException;
import com.tencent.trpc.core.extension.ExtensionLoader;
import com.tencent.trpc.core.rpc.AbstractRpcServer;
import com.tencent.trpc.core.rpc.ProviderInvoker;
import com.tencent.trpc.core.rpc.def.DefMethodInfoRegister;
import com.tencent.trpc.core.stream.Closeable;
import com.tencent.trpc.core.stream.transport.codec.FrameDecoder;
import com.tencent.trpc.core.stream.transport.spi.ServerTransportFactory;
import com.tencent.trpc.proto.standard.stream.TRpcStreamResponder;
import java.util.Objects;
import java.util.function.Supplier;
import reactor.core.publisher.Mono;

/* loaded from: input_file:com/tencent/trpc/proto/standard/stream/server/TRpcStreamServer.class */
public class TRpcStreamServer extends AbstractRpcServer {
    private final Supplier<FrameDecoder> frameDecoder;
    private final DefMethodInfoRegister methodInfoRegister = new DefMethodInfoRegister();
    private Closeable service;

    public TRpcStreamServer(ProtocolConfig protocolConfig, Supplier<FrameDecoder> supplier) {
        this.protocolConfig = (ProtocolConfig) Objects.requireNonNull(protocolConfig, "protocolConfig is null");
        this.frameDecoder = (Supplier) Objects.requireNonNull(supplier, "frameDecoder is null");
    }

    protected <T> void doExport(ProviderInvoker<T> providerInvoker) {
        this.methodInfoRegister.register(providerInvoker);
    }

    protected <T> void doUnExport(ProviderConfig<T> providerConfig) {
        this.methodInfoRegister.unregister(providerConfig);
    }

    protected void doOpen() throws Exception {
        Exception[] excArr = new Exception[1];
        ((ServerTransportFactory) ExtensionLoader.getExtensionLoader(ServerTransportFactory.class).getExtension(this.protocolConfig.getTransporter())).create(this.protocolConfig, this.frameDecoder).start(rpcConnection -> {
            return Mono.just(new TRpcStreamResponder(this.protocolConfig, rpcConnection, this.methodInfoRegister)).then();
        }).doOnSuccess(closeable -> {
            this.service = closeable;
            Thread thread = new Thread(() -> {
            }, "reactor-svr");
            thread.setDaemon(false);
            thread.start();
        }).doOnError(th -> {
            excArr[0] = TRpcException.newFrameException(31, "create service failed", th);
        }).block();
        if (excArr[0] != null) {
            throw excArr[0];
        }
    }

    protected void doClose() {
        if (this.service != null) {
            this.service.dispose();
            this.service = null;
        }
    }
}
