/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.nacos.common.remote.client.rsocket;

import com.alibaba.nacos.api.remote.request.ConnectionSetupRequest;
import com.alibaba.nacos.api.remote.request.Request;
import com.alibaba.nacos.api.remote.response.Response;
import com.alibaba.nacos.api.remote.response.ResponseCode;
import com.alibaba.nacos.api.remote.response.UnKnowResponse;
import com.alibaba.nacos.common.remote.ConnectionType;
import com.alibaba.nacos.common.remote.client.Connection;
import com.alibaba.nacos.common.remote.client.RpcClient;
import com.alibaba.nacos.common.remote.client.RpcClientStatus;
import com.alibaba.nacos.common.remote.client.rsocket.RsocketConnection;
import com.alibaba.nacos.common.remote.client.rsocket.RsocketUtils;
import io.rsocket.ConnectionSetupPayload;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.SocketAcceptor;
import io.rsocket.core.RSocketConnector;
import io.rsocket.transport.ClientTransport;
import io.rsocket.transport.netty.client.TcpClientTransport;
import io.rsocket.util.DefaultPayload;
import io.rsocket.util.RSocketProxy;
import java.util.Date;
import java.util.concurrent.atomic.AtomicReference;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

public class RsocketRpcClient
extends RpcClient {
    static final Logger LOGGER = LoggerFactory.getLogger((String)"com.alibaba.nacos.common.remote.client");
    private static final int RSOCKET_PORT_OFFSET = 1100;
    private AtomicReference<RSocket> rSocketClient = new AtomicReference();

    public RsocketRpcClient(String name) {
        super(name);
    }

    @Override
    public ConnectionType getConnectionType() {
        return ConnectionType.RSOCKET;
    }

    @Override
    public int rpcPortOffset() {
        return 1100;
    }

    @Override
    public Connection connectToServer(RpcClient.ServerInfo serverInfo) throws Exception {
        RSocket rSocket = null;
        try {
            ConnectionSetupRequest conconSetupRequest = new ConnectionSetupRequest();
            Payload setUpPayload = RsocketUtils.convertRequestToPayload((Request)conconSetupRequest, this.buildMeta());
            rSocket = (RSocket)RSocketConnector.create().setupPayload(setUpPayload).acceptor(new SocketAcceptor(){

                public Mono<RSocket> accept(ConnectionSetupPayload setup, RSocket sendingSocket) {
                    RSocketProxy rsocket = new RSocketProxy(sendingSocket){

                        public Mono<Payload> requestResponse(Payload payload) {
                            try {
                                RsocketUtils.PlainRequest plainRequest = RsocketUtils.parsePlainRequestFromPayload(payload);
                                try {
                                    Response response = RsocketRpcClient.this.handleServerRequest(plainRequest.getBody(), plainRequest.metadata);
                                    response.setRequestId(plainRequest.getBody().getRequestId());
                                    return Mono.just((Object)RsocketUtils.convertResponseToPayload(response));
                                }
                                catch (Exception e) {
                                    UnKnowResponse response = new UnKnowResponse();
                                    response.setResultCode(ResponseCode.FAIL.getCode());
                                    response.setMessage(e.getMessage());
                                    response.setRequestId(plainRequest.getBody().getRequestId());
                                    return Mono.just((Object)RsocketUtils.convertResponseToPayload((Response)response));
                                }
                            }
                            catch (Exception e) {
                                UnKnowResponse response = new UnKnowResponse();
                                response.setResultCode(ResponseCode.FAIL.getCode());
                                response.setMessage(e.getMessage());
                                return Mono.just((Object)DefaultPayload.create((Payload)RsocketUtils.convertResponseToPayload((Response)response)));
                            }
                        }

                        public Mono<Void> fireAndForget(Payload payload) {
                            RsocketUtils.PlainRequest plainRequest = RsocketUtils.parsePlainRequestFromPayload(payload);
                            RsocketRpcClient.this.handleServerRequest(plainRequest.getBody(), plainRequest.metadata);
                            return Mono.empty();
                        }
                    };
                    return Mono.just((Object)rsocket);
                }
            }).connect((ClientTransport)TcpClientTransport.create((String)serverInfo.getServerIp(), (int)serverInfo.getServerPort())).block();
            RsocketConnection connection = new RsocketConnection(serverInfo, rSocket);
            this.fireOnCloseEvent(rSocket, connection);
            return connection;
        }
        catch (Exception e) {
            this.shutDownRsocketClient(rSocket);
            throw e;
        }
    }

    void shutDownRsocketClient(RSocket client) {
        if (client != null && !client.isDisposed()) {
            client.dispose();
        }
    }

    void fireOnCloseEvent(RSocket rSocket, final Connection connectionInner) {
        Subscriber<Void> subscriber = new Subscriber<Void>(){

            public void onSubscribe(Subscription subscription) {
            }

            public void onNext(Void aVoid) {
            }

            public void onError(Throwable throwable) {
                if (RsocketRpcClient.this.isRunning() && !connectionInner.isAbandon()) {
                    System.out.println("onError ,switch server " + this + new Date().toString());
                    if (RsocketRpcClient.this.rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) {
                        RsocketRpcClient.this.switchServerAsync();
                    }
                } else {
                    System.out.println("client is not running status ,ignore error event , " + this + new Date().toString());
                }
            }

            public void onComplete() {
                if (RsocketRpcClient.this.isRunning() && !connectionInner.isAbandon()) {
                    System.out.println("onCompleted ,switch server " + this);
                    if (RsocketRpcClient.this.rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) {
                        RsocketRpcClient.this.switchServerAsync();
                    }
                } else {
                    System.out.println("client is not running status ,ignore complete  event , " + this + new Date().toString());
                }
            }
        };
        rSocket.onClose().subscribe((Subscriber)subscriber);
    }
}

