/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.nacos.common.remote.client.rsocket;

import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.remote.RequestCallBack;
import com.alibaba.nacos.api.remote.RequestFuture;
import com.alibaba.nacos.api.remote.RpcScheduledExecutor;
import com.alibaba.nacos.api.remote.request.Request;
import com.alibaba.nacos.api.remote.request.RequestMeta;
import com.alibaba.nacos.api.remote.response.Response;
import com.alibaba.nacos.common.remote.client.Connection;
import com.alibaba.nacos.common.remote.client.RpcClient;
import com.alibaba.nacos.common.remote.client.rsocket.RsocketUtils;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import java.time.Duration;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import java.util.function.Function;
import reactor.core.publisher.Mono;

public class RsocketConnection
extends Connection {
    private RSocket rSocketClient;

    public RsocketConnection(RpcClient.ServerInfo serverInfo, RSocket rSocketClient) {
        super(serverInfo);
        this.rSocketClient = rSocketClient;
    }

    public Response request(Request request, RequestMeta requestMeta) throws NacosException {
        return this.request(request, requestMeta, 3000L);
    }

    public Response request(Request request, RequestMeta requestMeta, long timeouts) throws NacosException {
        Payload response = (Payload)this.rSocketClient.requestResponse(RsocketUtils.convertRequestToPayload(request, requestMeta)).block(Duration.ofMillis(timeouts));
        return RsocketUtils.parseResponseFromPayload(response);
    }

    public RequestFuture requestFuture(Request request, RequestMeta requestMeta) throws NacosException {
        Mono response = this.rSocketClient.requestResponse(RsocketUtils.convertRequestToPayload(request, requestMeta));
        final CompletableFuture payloadCompletableFuture = response.toFuture();
        return new RequestFuture(){

            public boolean isDone() {
                return payloadCompletableFuture.isDone();
            }

            public Response get() throws InterruptedException, ExecutionException {
                Payload block = (Payload)payloadCompletableFuture.get();
                return RsocketUtils.parseResponseFromPayload(block);
            }

            public Response get(long timeout) throws TimeoutException, InterruptedException, ExecutionException {
                Payload block = (Payload)payloadCompletableFuture.get(timeout, TimeUnit.MILLISECONDS);
                return RsocketUtils.parseResponseFromPayload(block);
            }
        };
    }

    public void asyncRequest(Request request, RequestMeta requestMeta, final RequestCallBack requestCallBack) throws NacosException {
        try {
            Mono response = this.rSocketClient.requestResponse(RsocketUtils.convertRequestToPayload(request, requestMeta));
            CompletableFuture payloadCompletableFuture = response.toFuture();
            payloadCompletableFuture.acceptEither((CompletionStage)RsocketConnection.failAfter(requestCallBack.getTimeout()), new Consumer<Payload>(){

                @Override
                public void accept(Payload payload) {
                    requestCallBack.onResponse(RsocketUtils.parseResponseFromPayload(payload));
                }
            });
            payloadCompletableFuture.exceptionally(new Function<Throwable, Payload>(){

                @Override
                public Payload apply(Throwable throwable) {
                    requestCallBack.onException(throwable);
                    return null;
                }
            });
        }
        catch (Exception e) {
            requestCallBack.onException((Throwable)e);
        }
    }

    private static <T> CompletableFuture<T> failAfter(final long timeouts) {
        final CompletableFuture promise = new CompletableFuture();
        RpcScheduledExecutor.TIMEOUT_SHEDULER.schedule((Callable)new Callable<Object>(){

            @Override
            public Object call() throws Exception {
                TimeoutException ex = new TimeoutException("Timeout after " + timeouts);
                return promise.completeExceptionally(ex);
            }
        }, timeouts, TimeUnit.MILLISECONDS);
        return promise;
    }

    public void close() {
        if (this.rSocketClient != null && !this.rSocketClient.isDisposed()) {
            this.rSocketClient.dispose();
        }
    }

    public RSocket getrSocketClient() {
        return this.rSocketClient;
    }

    public String toString() {
        return "RsocketConnection{serverInfo=" + this.serverInfo + ", labels=" + this.labels + '}';
    }
}

