package com.alibaba.nacos.common.remote.client.rsocket;

import com.alibaba.nacos.api.remote.request.ConnectionSetupRequest;
import com.alibaba.nacos.api.remote.response.Response;
import com.alibaba.nacos.api.remote.response.ResponseCode;
import com.alibaba.nacos.api.remote.response.UnKnowResponse;
import com.alibaba.nacos.common.remote.ConnectionType;
import com.alibaba.nacos.common.remote.client.Connection;
import com.alibaba.nacos.common.remote.client.RpcClient;
import com.alibaba.nacos.common.remote.client.RpcClientStatus;
import com.alibaba.nacos.common.remote.client.rsocket.RsocketUtils;
import io.rsocket.ConnectionSetupPayload;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.SocketAcceptor;
import io.rsocket.core.RSocketConnector;
import io.rsocket.transport.netty.client.TcpClientTransport;
import io.rsocket.util.DefaultPayload;
import io.rsocket.util.RSocketProxy;
import java.util.Date;
import java.util.concurrent.atomic.AtomicReference;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

/* loaded from: input_file:com/alibaba/nacos/common/remote/client/rsocket/RsocketRpcClient.class */
public class RsocketRpcClient extends RpcClient {
    static final Logger LOGGER = LoggerFactory.getLogger("com.alibaba.nacos.common.remote.client");
    private static final int RSOCKET_PORT_OFFSET = 1100;
    private AtomicReference<RSocket> rSocketClient;

    public RsocketRpcClient(String str) {
        super(str);
        this.rSocketClient = new AtomicReference<>();
    }

    @Override // com.alibaba.nacos.common.remote.client.RpcClient
    public ConnectionType getConnectionType() {
        return ConnectionType.RSOCKET;
    }

    @Override // com.alibaba.nacos.common.remote.client.RpcClient
    public int rpcPortOffset() {
        return RSOCKET_PORT_OFFSET;
    }

    @Override // com.alibaba.nacos.common.remote.client.RpcClient
    public Connection connectToServer(RpcClient.ServerInfo serverInfo) throws Exception {
        RSocket rSocket = null;
        try {
            rSocket = (RSocket) RSocketConnector.create().setupPayload(RsocketUtils.convertRequestToPayload(new ConnectionSetupRequest(), buildMeta())).acceptor(new SocketAcceptor() { // from class: com.alibaba.nacos.common.remote.client.rsocket.RsocketRpcClient.1
                public Mono<RSocket> accept(ConnectionSetupPayload connectionSetupPayload, RSocket rSocket2) {
                    return Mono.just(new RSocketProxy(rSocket2) { // from class: com.alibaba.nacos.common.remote.client.rsocket.RsocketRpcClient.1.1
                        public Mono<Payload> requestResponse(Payload payload) {
                            try {
                                RsocketUtils.PlainRequest parsePlainRequestFromPayload = RsocketUtils.parsePlainRequestFromPayload(payload);
                                try {
                                    Response handleServerRequest = RsocketRpcClient.this.handleServerRequest(parsePlainRequestFromPayload.getBody(), parsePlainRequestFromPayload.metadata);
                                    handleServerRequest.setRequestId(parsePlainRequestFromPayload.getBody().getRequestId());
                                    return Mono.just(RsocketUtils.convertResponseToPayload(handleServerRequest));
                                } catch (Exception e) {
                                    UnKnowResponse unKnowResponse = new UnKnowResponse();
                                    unKnowResponse.setResultCode(ResponseCode.FAIL.getCode());
                                    unKnowResponse.setMessage(e.getMessage());
                                    unKnowResponse.setRequestId(parsePlainRequestFromPayload.getBody().getRequestId());
                                    return Mono.just(RsocketUtils.convertResponseToPayload(unKnowResponse));
                                }
                            } catch (Exception e2) {
                                UnKnowResponse unKnowResponse2 = new UnKnowResponse();
                                unKnowResponse2.setResultCode(ResponseCode.FAIL.getCode());
                                unKnowResponse2.setMessage(e2.getMessage());
                                return Mono.just(DefaultPayload.create(RsocketUtils.convertResponseToPayload(unKnowResponse2)));
                            }
                        }

                        public Mono<Void> fireAndForget(Payload payload) {
                            RsocketUtils.PlainRequest parsePlainRequestFromPayload = RsocketUtils.parsePlainRequestFromPayload(payload);
                            RsocketRpcClient.this.handleServerRequest(parsePlainRequestFromPayload.getBody(), parsePlainRequestFromPayload.metadata);
                            return Mono.empty();
                        }
                    });
                }
            }).connect(TcpClientTransport.create(serverInfo.getServerIp(), serverInfo.getServerPort())).block();
            RsocketConnection rsocketConnection = new RsocketConnection(serverInfo, rSocket);
            fireOnCloseEvent(rSocket, rsocketConnection);
            return rsocketConnection;
        } catch (Exception e) {
            shutDownRsocketClient(rSocket);
            throw e;
        }
    }

    void shutDownRsocketClient(RSocket rSocket) {
        if (rSocket == null || rSocket.isDisposed()) {
            return;
        }
        rSocket.dispose();
    }

    void fireOnCloseEvent(RSocket rSocket, final Connection connection) {
        rSocket.onClose().subscribe(new Subscriber<Void>() { // from class: com.alibaba.nacos.common.remote.client.rsocket.RsocketRpcClient.2
            public void onSubscribe(Subscription subscription) {
            }

            public void onNext(Void r2) {
            }

            public void onError(Throwable th) {
                if (!RsocketRpcClient.this.isRunning() || connection.isAbandon()) {
                    System.out.println("client is not running status ,ignore error event , " + this + new Date().toString());
                    return;
                }
                System.out.println("onError ,switch server " + this + new Date().toString());
                if (RsocketRpcClient.this.rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) {
                    RsocketRpcClient.this.switchServerAsync();
                }
            }

            public void onComplete() {
                if (!RsocketRpcClient.this.isRunning() || connection.isAbandon()) {
                    System.out.println("client is not running status ,ignore complete  event , " + this + new Date().toString());
                    return;
                }
                System.out.println("onCompleted ,switch server " + this);
                if (RsocketRpcClient.this.rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) {
                    RsocketRpcClient.this.switchServerAsync();
                }
            }
        });
    }
}
