/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.graphql.client;

import java.net.URI;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Publisher;
import org.springframework.graphql.GraphQlRequest;
import org.springframework.graphql.GraphQlResponse;
import org.springframework.graphql.ResponseError;
import org.springframework.graphql.client.CodecDelegate;
import org.springframework.graphql.client.GraphQlTransport;
import org.springframework.graphql.client.ResponseMapGraphQlResponse;
import org.springframework.graphql.client.SubscriptionErrorException;
import org.springframework.graphql.client.WebSocketDisconnectedException;
import org.springframework.graphql.client.WebSocketGraphQlClientInterceptor;
import org.springframework.graphql.server.support.GraphQlWebSocketMessage;
import org.springframework.graphql.server.support.GraphQlWebSocketMessageType;
import org.springframework.http.HttpHeaders;
import org.springframework.http.codec.CodecConfigurer;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.web.reactive.socket.CloseStatus;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketSession;
import org.springframework.web.reactive.socket.client.WebSocketClient;
import reactor.core.Scannable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import reactor.core.publisher.Sinks;

final class WebSocketGraphQlTransport
implements GraphQlTransport {
    private static final Log logger = LogFactory.getLog(WebSocketGraphQlTransport.class);
    private final URI url;
    private final HttpHeaders headers = new HttpHeaders();
    private final WebSocketClient webSocketClient;
    private final GraphQlSessionHandler graphQlSessionHandler;
    private final Mono<GraphQlSession> graphQlSessionMono;
    @Nullable
    private final Duration keepAlive;

    WebSocketGraphQlTransport(URI url, @Nullable HttpHeaders headers, WebSocketClient client, CodecConfigurer codecConfigurer, WebSocketGraphQlClientInterceptor interceptor, @Nullable Duration keepAlive) {
        Assert.notNull((Object)url, (String)"URI is required");
        Assert.notNull((Object)client, (String)"WebSocketClient is required");
        Assert.notNull((Object)codecConfigurer, (String)"CodecConfigurer is required");
        Assert.notNull((Object)interceptor, (String)"WebSocketGraphQlClientInterceptor is required");
        this.url = url;
        this.headers.putAll((Map)(headers != null ? headers : HttpHeaders.EMPTY));
        this.webSocketClient = client;
        this.keepAlive = keepAlive;
        this.graphQlSessionHandler = new GraphQlSessionHandler(codecConfigurer, interceptor, keepAlive);
        this.graphQlSessionMono = WebSocketGraphQlTransport.initGraphQlSession(this.url, this.headers, client, this.graphQlSessionHandler).cacheInvalidateWhen(GraphQlSession::notifyWhenClosed);
    }

    private static Mono<GraphQlSession> initGraphQlSession(URI uri, HttpHeaders headers, WebSocketClient client, GraphQlSessionHandler handler) {
        return Mono.defer(() -> {
            if (handler.isStopped()) {
                return Mono.error((Throwable)new IllegalStateException("WebSocketGraphQlTransport has been stopped"));
            }
            Mono<GraphQlSession> sessionMono = handler.getGraphQlSession();
            client.execute(uri, headers, (WebSocketHandler)handler).subscribe(aVoid -> {}, handler::handleWebSocketSessionError, handler::handleWebSocketSessionClosed);
            return sessionMono;
        });
    }

    URI getUrl() {
        return this.url;
    }

    HttpHeaders getHeaders() {
        return this.headers;
    }

    WebSocketClient getWebSocketClient() {
        return this.webSocketClient;
    }

    CodecConfigurer getCodecConfigurer() {
        return this.graphQlSessionHandler.getCodecConfigurer();
    }

    Mono<Void> start() {
        this.graphQlSessionHandler.setStopped(false);
        return this.graphQlSessionMono.then();
    }

    Mono<Void> stop() {
        this.graphQlSessionHandler.setStopped(true);
        return this.graphQlSessionMono.flatMap(GraphQlSession::close).onErrorResume(ex -> Mono.empty());
    }

    @Override
    public Mono<GraphQlResponse> execute(GraphQlRequest request) {
        return this.graphQlSessionMono.flatMap(session -> session.execute(request));
    }

    @Override
    public Flux<GraphQlResponse> executeSubscription(GraphQlRequest request) {
        return this.graphQlSessionMono.flatMapMany(session -> session.executeSubscription(request));
    }

    @Nullable
    Duration getKeepAlive() {
        return this.keepAlive;
    }

    private static class GraphQlSessionHandler
    implements WebSocketHandler {
        private final CodecDelegate codecDelegate;
        private final WebSocketGraphQlClientInterceptor interceptor;
        private Sinks.One<GraphQlSession> graphQlSessionSink;
        private final AtomicBoolean stopped = new AtomicBoolean();
        @Nullable
        private final Duration keepAlive;

        GraphQlSessionHandler(CodecConfigurer codecConfigurer, WebSocketGraphQlClientInterceptor interceptor, @Nullable Duration keepAlive) {
            this.codecDelegate = new CodecDelegate(codecConfigurer);
            this.interceptor = interceptor;
            this.graphQlSessionSink = Sinks.unsafe().one();
            this.keepAlive = keepAlive;
        }

        CodecConfigurer getCodecConfigurer() {
            return this.codecDelegate.getCodecConfigurer();
        }

        public List<String> getSubProtocols() {
            return Collections.singletonList("graphql-transport-ws");
        }

        Mono<GraphQlSession> getGraphQlSession() {
            return this.graphQlSessionSink.asMono();
        }

        void setStopped(boolean stopped) {
            this.stopped.set(stopped);
        }

        boolean isStopped() {
            return this.stopped.get();
        }

        public Mono<Void> handle(WebSocketSession session) {
            Assert.state((boolean)this.sessionNotInitialized(), (String)"This handler supports only one session at a time, for shared use.");
            GraphQlSession graphQlSession = new GraphQlSession(session);
            this.registerCloseStatusHandling(graphQlSession, session);
            Mono connectionInitMono = this.interceptor.connectionInitPayload().defaultIfEmpty(Collections.emptyMap()).map(GraphQlWebSocketMessage::connectionInit);
            Mono sendCompletion = session.send((Publisher)connectionInitMono.concatWith(graphQlSession.getRequestFlux()).map(message -> this.codecDelegate.encode(session, (GraphQlWebSocketMessage)message)));
            Mono receiveCompletion = session.receive().flatMap(webSocketMessage -> {
                if (this.sessionNotInitialized()) {
                    try {
                        GraphQlWebSocketMessage message = this.codecDelegate.decode((WebSocketMessage)webSocketMessage);
                        Assert.state((message.resolvedType() == GraphQlWebSocketMessageType.CONNECTION_ACK ? 1 : 0) != 0, () -> "Unexpected message before connection_ack: " + message);
                        return this.interceptor.handleConnectionAck((Map)message.getPayload()).then(Mono.defer(() -> {
                            Sinks.EmitResult result;
                            if (logger.isDebugEnabled()) {
                                logger.debug((Object)(graphQlSession + " initialized"));
                            }
                            if ((result = this.graphQlSessionSink.tryEmitValue((Object)graphQlSession)).isFailure()) {
                                return Mono.error((Throwable)new IllegalStateException("GraphQlSession initialized but could not be emitted: " + result));
                            }
                            return Mono.empty();
                        }));
                    }
                    catch (Throwable ex) {
                        this.graphQlSessionSink.tryEmitError(ex);
                        return Mono.error((Throwable)ex);
                    }
                }
                try {
                    GraphQlWebSocketMessage message = this.codecDelegate.decode((WebSocketMessage)webSocketMessage);
                    switch (message.resolvedType()) {
                        case NEXT: {
                            graphQlSession.handleNext(message);
                            break;
                        }
                        case PING: {
                            graphQlSession.sendPong(null);
                            break;
                        }
                        case PONG: {
                            break;
                        }
                        case ERROR: {
                            graphQlSession.handleError(message);
                            break;
                        }
                        case COMPLETE: {
                            graphQlSession.handleComplete(message);
                            break;
                        }
                        default: {
                            throw new IllegalStateException("Unexpected message type: '" + message.getType() + "'");
                        }
                    }
                }
                catch (Exception ex) {
                    if (logger.isErrorEnabled()) {
                        logger.error((Object)("Closing " + session + ": " + ex));
                    }
                    return session.close(new CloseStatus(4400, "Invalid message"));
                }
                return Mono.empty();
            }).mergeWith((Publisher)(this.keepAlive != null ? Flux.interval((Duration)this.keepAlive, (Duration)this.keepAlive).filter(aLong -> graphQlSession.checkSentOrReceivedMessagesAndClear()).doOnNext(aLong -> graphQlSession.sendPing()).then() : Flux.empty())).then();
            if (this.keepAlive != null) {
                Flux.interval((Duration)this.keepAlive, (Duration)this.keepAlive).filter(aLong -> graphQlSession.checkSentOrReceivedMessagesAndClear()).doOnNext(aLong -> graphQlSession.sendPing()).subscribe();
            }
            return Mono.zip((Mono)sendCompletion, (Mono)receiveCompletion.then()).then();
        }

        private boolean sessionNotInitialized() {
            return !Boolean.TRUE.equals(this.graphQlSessionSink.scan(Scannable.Attr.TERMINATED));
        }

        private void registerCloseStatusHandling(GraphQlSession graphQlSession, WebSocketSession session) {
            session.closeStatus().defaultIfEmpty((Object)CloseStatus.NO_STATUS_CODE).doOnNext(closeStatus -> {
                String closeStatusMessage = this.initCloseStatusMessage((CloseStatus)closeStatus, null, graphQlSession);
                if (logger.isDebugEnabled()) {
                    logger.debug((Object)closeStatusMessage);
                }
                graphQlSession.terminateRequests(closeStatusMessage, (CloseStatus)closeStatus);
            }).doOnError(cause -> {
                CloseStatus closeStatus = CloseStatus.NO_STATUS_CODE;
                String closeStatusMessage = this.initCloseStatusMessage(closeStatus, (Throwable)cause, graphQlSession);
                if (logger.isErrorEnabled()) {
                    logger.error((Object)closeStatusMessage);
                }
                graphQlSession.terminateRequests(closeStatusMessage, closeStatus);
            }).subscribe();
        }

        private String initCloseStatusMessage(CloseStatus status, @Nullable Throwable ex, GraphQlSession session) {
            String reason = session + " disconnected";
            reason = this.isStopped() ? session + " was stopped" : (ex != null ? reason + ", closeStatus() completed with error " + ex : (!status.equals((Object)CloseStatus.NO_STATUS_CODE) ? reason + " with " + status : reason + " without a status"));
            return reason;
        }

        void handleWebSocketSessionError(Throwable ex) {
            if (logger.isDebugEnabled()) {
                logger.debug((Object)("Session handling error: " + ex.getMessage()), ex);
            } else if (logger.isErrorEnabled()) {
                logger.error((Object)("Session handling error: " + ex.getMessage()));
            }
            this.graphQlSessionSink.tryEmitError(ex);
            this.graphQlSessionSink = Sinks.unsafe().one();
        }

        void handleWebSocketSessionClosed() {
            this.graphQlSessionSink = Sinks.unsafe().one();
        }
    }

    private static class GraphQlSession {
        private final DisposableConnection connection;
        private final AtomicLong requestIndex = new AtomicLong();
        private final RequestSink requestSink = new RequestSink();
        private final Map<String, RequestState> requestStateMap = new ConcurrentHashMap<String, RequestState>();
        private boolean hasReceivedMessages;

        GraphQlSession(WebSocketSession webSocketSession) {
            this.connection = DisposableConnection.from(webSocketSession);
        }

        Flux<GraphQlWebSocketMessage> getRequestFlux() {
            return this.requestSink.getRequestFlux();
        }

        Mono<GraphQlResponse> execute(GraphQlRequest request) {
            String id = String.valueOf(this.requestIndex.incrementAndGet());
            return Mono.create(sink -> {
                SingleResponseRequestState state = new SingleResponseRequestState(request, (MonoSink<GraphQlResponse>)sink);
                this.requestStateMap.put(id, state);
                try {
                    GraphQlWebSocketMessage message = GraphQlWebSocketMessage.subscribe(id, request);
                    this.requestSink.sendRequest(message);
                }
                catch (Exception ex) {
                    this.requestStateMap.remove(id);
                    sink.error((Throwable)ex);
                }
            }).doOnCancel(() -> this.requestStateMap.remove(id));
        }

        Flux<GraphQlResponse> executeSubscription(GraphQlRequest request) {
            String id = String.valueOf(this.requestIndex.incrementAndGet());
            return Flux.create(sink -> {
                SubscriptionRequestState state = new SubscriptionRequestState(request, (FluxSink<GraphQlResponse>)sink);
                this.requestStateMap.put(id, state);
                try {
                    GraphQlWebSocketMessage message = GraphQlWebSocketMessage.subscribe(id, request);
                    this.requestSink.sendRequest(message);
                }
                catch (Exception ex) {
                    this.requestStateMap.remove(id);
                    sink.error((Throwable)ex);
                }
            }).doOnCancel(() -> this.stopSubscription(id));
        }

        private void stopSubscription(String id) {
            RequestState state = this.requestStateMap.remove(id);
            if (state != null) {
                try {
                    this.requestSink.sendRequest(GraphQlWebSocketMessage.complete(id));
                }
                catch (Exception ex) {
                    if (logger.isErrorEnabled()) {
                        logger.error((Object)("Closing " + this.connection.getDescription() + " after failure to send 'complete' for subscription id='" + id + "'."));
                    }
                    this.connection.close(CloseStatus.PROTOCOL_ERROR).subscribe();
                }
            }
        }

        void sendPong(@Nullable Map<String, Object> payload) {
            GraphQlWebSocketMessage message = GraphQlWebSocketMessage.pong(payload);
            this.requestSink.sendRequest(message);
        }

        void sendPing() {
            GraphQlWebSocketMessage message = GraphQlWebSocketMessage.ping(null);
            this.requestSink.sendRequest(message);
        }

        boolean checkSentOrReceivedMessagesAndClear() {
            boolean received = this.hasReceivedMessages;
            this.hasReceivedMessages = false;
            return this.requestSink.checkSentMessagesAndClear() || received;
        }

        void handleNext(GraphQlWebSocketMessage message) {
            String id = message.getId();
            RequestState requestState = this.requestStateMap.get(id);
            if (requestState == null) {
                if (logger.isDebugEnabled()) {
                    logger.debug((Object)("No receiver for: " + message));
                }
                return;
            }
            this.hasReceivedMessages = true;
            if (requestState instanceof SingleResponseRequestState) {
                this.requestStateMap.remove(id);
            }
            Map payload = (Map)message.getPayload();
            ResponseMapGraphQlResponse graphQlResponse = new ResponseMapGraphQlResponse(payload);
            requestState.handleResponse(graphQlResponse);
        }

        void handleError(GraphQlWebSocketMessage message) {
            String id = message.getId();
            RequestState requestState = this.requestStateMap.remove(id);
            if (requestState == null) {
                if (logger.isDebugEnabled()) {
                    logger.debug((Object)("No receiver for: " + message));
                }
                return;
            }
            List errorList = (List)message.getPayload();
            ResponseMapGraphQlResponse response = new ResponseMapGraphQlResponse(Collections.singletonMap("errors", errorList));
            if (requestState instanceof SingleResponseRequestState) {
                requestState.handleResponse(response);
            } else {
                List<ResponseError> errors = response.getErrors();
                SubscriptionErrorException ex = new SubscriptionErrorException(requestState.request(), errors);
                requestState.handlerError((Throwable)((Object)ex));
            }
        }

        void handleComplete(GraphQlWebSocketMessage message) {
            String id = message.getId();
            RequestState requestState = this.requestStateMap.remove(id);
            if (requestState == null) {
                if (logger.isDebugEnabled()) {
                    logger.debug((Object)("No receiver for': " + message));
                }
                return;
            }
            requestState.handleCompletion();
        }

        Mono<Void> notifyWhenClosed() {
            return this.connection.notifyWhenClosed();
        }

        Mono<Void> close() {
            return this.connection.close(CloseStatus.GOING_AWAY);
        }

        void terminateRequests(String message, CloseStatus status) {
            this.requestStateMap.values().forEach(info -> info.emitDisconnectError(message, status));
            this.requestStateMap.clear();
        }

        public String toString() {
            return "GraphQlSession over " + this.connection.getDescription();
        }
    }

    private record SubscriptionRequestState(GraphQlRequest request, FluxSink<GraphQlResponse> responseSink) implements RequestState
    {
        @Override
        public void handleResponse(GraphQlResponse response) {
            this.responseSink.next((Object)response);
        }

        @Override
        public void handlerError(Throwable ex) {
            this.responseSink.error(ex);
        }

        @Override
        public void handleCompletion() {
            this.responseSink.complete();
        }

        @Override
        public void emitDisconnectError(WebSocketDisconnectedException ex) {
            this.handlerError((Throwable)((Object)ex));
        }
    }

    private record SingleResponseRequestState(GraphQlRequest request, MonoSink<GraphQlResponse> responseSink) implements RequestState
    {
        @Override
        public void handleResponse(GraphQlResponse response) {
            this.responseSink.success((Object)response);
        }

        @Override
        public void handlerError(Throwable ex) {
            this.responseSink.error(ex);
        }

        @Override
        public void handleCompletion() {
            this.responseSink.success();
        }

        @Override
        public void emitDisconnectError(WebSocketDisconnectedException ex) {
            this.handlerError((Throwable)((Object)ex));
        }
    }

    private static interface RequestState {
        public GraphQlRequest request();

        public void handleResponse(GraphQlResponse var1);

        public void handlerError(Throwable var1);

        public void handleCompletion();

        default public void emitDisconnectError(String message, CloseStatus closeStatus) {
            this.emitDisconnectError(new WebSocketDisconnectedException(message, this.request(), closeStatus));
        }

        public void emitDisconnectError(WebSocketDisconnectedException var1);
    }

    private static final class RequestSink {
        @Nullable
        private FluxSink<GraphQlWebSocketMessage> requestSink;
        private boolean hasSentMessages;
        private final Flux<GraphQlWebSocketMessage> requestFlux = Flux.create(sink -> {
            Assert.state((this.requestSink == null ? 1 : 0) != 0, (String)"Expected single subscriber only for outbound messages");
            this.requestSink = sink;
        });

        private RequestSink() {
        }

        Flux<GraphQlWebSocketMessage> getRequestFlux() {
            return this.requestFlux;
        }

        void sendRequest(GraphQlWebSocketMessage message) {
            Assert.state((this.requestSink != null ? 1 : 0) != 0, (String)"Unexpected request before Flux is subscribed to");
            this.hasSentMessages = true;
            this.requestSink.next((Object)message);
        }

        boolean checkSentMessagesAndClear() {
            boolean result = this.hasSentMessages;
            this.hasSentMessages = false;
            return result;
        }
    }

    private static interface DisposableConnection {
        public Mono<Void> close(CloseStatus var1);

        public Mono<Void> notifyWhenClosed();

        public String getDescription();

        public static DisposableConnection from(final WebSocketSession session) {
            return new DisposableConnection(){

                @Override
                public Mono<Void> close(CloseStatus status) {
                    return session.close(status);
                }

                @Override
                public Mono<Void> notifyWhenClosed() {
                    return session.closeStatus().then();
                }

                @Override
                public String getDescription() {
                    return session.toString();
                }
            };
        }
    }
}

