/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.cloud.gateway.test.websocket;

import java.net.URI;
import java.net.URISyntaxException;
import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.assertj.core.api.AbstractStringAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ObjectAssert;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledIfEnvironmentVariable;
import org.reactivestreams.Publisher;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.cloud.client.DefaultServiceInstance;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.gateway.route.RouteLocator;
import org.springframework.cloud.gateway.route.builder.RouteLocatorBuilder;
import org.springframework.cloud.gateway.test.PermitAllSecurityConfiguration;
import org.springframework.cloud.gateway.test.support.HttpServer;
import org.springframework.cloud.gateway.test.support.ReactorHttpServer;
import org.springframework.cloud.loadbalancer.annotation.LoadBalancerClient;
import org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier;
import org.springframework.cloud.loadbalancer.support.ServiceInstanceListSuppliers;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.Lifecycle;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.core.env.ConfigurableEnvironment;
import org.springframework.core.env.Environment;
import org.springframework.http.HttpHeaders;
import org.springframework.http.ResponseCookie;
import org.springframework.http.server.reactive.HttpHandler;
import org.springframework.util.StringUtils;
import org.springframework.web.reactive.DispatcherHandler;
import org.springframework.web.reactive.HandlerMapping;
import org.springframework.web.reactive.handler.SimpleUrlHandlerMapping;
import org.springframework.web.reactive.socket.CloseStatus;
import org.springframework.web.reactive.socket.HandshakeInfo;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketSession;
import org.springframework.web.reactive.socket.client.ReactorNettyWebSocketClient;
import org.springframework.web.reactive.socket.client.WebSocketClient;
import org.springframework.web.reactive.socket.server.RequestUpgradeStrategy;
import org.springframework.web.reactive.socket.server.WebSocketService;
import org.springframework.web.reactive.socket.server.support.HandshakeWebSocketService;
import org.springframework.web.reactive.socket.server.support.WebSocketHandlerAdapter;
import org.springframework.web.reactive.socket.server.upgrade.ReactorNettyRequestUpgradeStrategy;
import org.springframework.web.server.WebFilter;
import org.springframework.web.server.adapter.WebHttpHandlerBuilder;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;

@DisabledIfEnvironmentVariable(named="GITHUB_ACTIONS", matches="true")
public class WebSocketIntegrationTests {
    private static final Duration TIMEOUT = Duration.ofMillis(5000L);
    private static final Log logger = LogFactory.getLog(WebSocketIntegrationTests.class);
    protected int serverPort;
    private WebSocketClient client;
    private HttpServer server;
    private ConfigurableApplicationContext gatewayContext;
    private int gatewayPort;
    private static final Sinks.One<CloseStatus> serverCloseStatusSink = Sinks.one();

    @BeforeEach
    public void setup() throws Exception {
        this.client = new ReactorNettyWebSocketClient();
        this.server = new ReactorHttpServer();
        this.server.setHandler(this.createHttpHandler());
        this.server.afterPropertiesSet();
        this.server.start();
        this.serverPort = this.server.getPort();
        if (this.client instanceof Lifecycle) {
            ((Lifecycle)this.client).start();
        }
        this.gatewayContext = new SpringApplicationBuilder(new Class[]{GatewayConfig.class}).properties(new String[]{"ws.server.port:" + this.serverPort, "server.port=0", "spring.jmx.enabled=false"}).run(new String[0]);
        ConfigurableEnvironment env = (ConfigurableEnvironment)this.gatewayContext.getBean(ConfigurableEnvironment.class);
        this.gatewayPort = Integer.valueOf(env.getProperty("local.server.port"));
    }

    @AfterEach
    public void stop() throws Exception {
        if (this.client instanceof Lifecycle) {
            ((Lifecycle)this.client).stop();
        }
        this.server.stop();
        if (this.gatewayContext != null) {
            this.gatewayContext.stop();
        }
    }

    private HttpHandler createHttpHandler() {
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
        context.register(new Class[]{WebSocketTestConfig.class});
        context.refresh();
        return WebHttpHandlerBuilder.applicationContext((ApplicationContext)context).build();
    }

    protected URI getUrl(String path) throws URISyntaxException {
        return new URI("ws://localhost:" + this.gatewayPort + path);
    }

    protected URI getHttpUrl(String path) throws URISyntaxException {
        return new URI("http://localhost:" + this.gatewayPort + path);
    }

    @Test
    public void echo() throws Exception {
        int count = 100;
        Flux input = Flux.range((int)1, (int)count).map(index -> "msg-" + index);
        AtomicReference actualRef = new AtomicReference();
        this.client.execute(this.getUrl("/echo"), session -> session.send((Publisher)input.map(arg_0 -> ((WebSocketSession)session).textMessage(arg_0))).thenMany((Publisher)session.receive().take((long)count).map(WebSocketMessage::getPayloadAsText)).collectList().doOnNext(actualRef::set).then()).block(TIMEOUT);
        Assertions.assertThat((List)((List)actualRef.get())).isNotNull();
        Assertions.assertThat((List)((List)actualRef.get())).isEqualTo(input.collectList().block());
    }

    @Test
    public void echoForHttp() throws Exception {
        int count = 100;
        Flux input = Flux.range((int)1, (int)count).map(index -> "msg-" + index);
        AtomicReference actualRef = new AtomicReference();
        this.client.execute(this.getHttpUrl("/echoForHttp"), session -> {
            logger.debug((Object)"Starting to send messages");
            return session.send((Publisher)input.doOnNext(s -> logger.debug((Object)("outbound " + s))).map(arg_0 -> ((WebSocketSession)session).textMessage(arg_0))).thenMany((Publisher)session.receive().take((long)count).map(WebSocketMessage::getPayloadAsText)).collectList().doOnNext(actualRef::set).then();
        }).block(TIMEOUT);
        Assertions.assertThat((List)((List)actualRef.get())).isNotNull();
        Assertions.assertThat((List)((List)actualRef.get())).isEqualTo(input.collectList().block());
    }

    @Test
    public void subProtocol() throws Exception {
        final String protocol = "echo-v1";
        final String protocol2 = "echo-v2";
        final AtomicReference infoRef = new AtomicReference();
        final AtomicReference protocolRef = new AtomicReference();
        this.client.execute(this.getUrl("/sub-protocol"), new WebSocketHandler(){

            public List<String> getSubProtocols() {
                return Arrays.asList(protocol, protocol2);
            }

            public Mono<Void> handle(WebSocketSession session) {
                infoRef.set(session.getHandshakeInfo());
                return session.receive().map(WebSocketMessage::getPayloadAsText).doOnNext(protocolRef::set).doOnError(protocolRef::set).then();
            }
        }).block(TIMEOUT);
        HandshakeInfo info = (HandshakeInfo)infoRef.get();
        Assertions.assertThat((String)info.getHeaders().getFirst("Upgrade")).isEqualToIgnoringCase((CharSequence)"websocket");
        Assertions.assertThat((String)info.getHeaders().getFirst("Sec-WebSocket-Protocol")).isEqualTo(protocol);
        ((AbstractStringAssert)Assertions.assertThat((String)info.getSubProtocol()).as("Wrong protocol accepted", new Object[0])).isEqualTo(protocol);
        ((ObjectAssert)Assertions.assertThat(protocolRef.get()).as("Wrong protocol detected on the server side", new Object[0])).isEqualTo((Object)protocol);
    }

    @Test
    public void customHeader() throws Exception {
        HttpHeaders headers = new HttpHeaders();
        headers.add("my-header", "my-value");
        AtomicReference headerRef = new AtomicReference();
        this.client.execute(this.getUrl("/custom-header"), headers, session -> session.receive().map(WebSocketMessage::getPayloadAsText).doOnNext(headerRef::set).doOnError(headerRef::set).then()).block(TIMEOUT);
        Assertions.assertThat(headerRef.get()).isEqualTo((Object)"my-header:my-value");
    }

    @Test
    public void serverClosing() throws Exception {
        AtomicReference closeStatus = new AtomicReference();
        this.client.execute(this.getUrl("/server-close"), session -> {
            logger.debug((Object)"Starting..");
            closeStatus.set(session.closeStatus());
            return session.receive().doOnNext(s -> logger.debug((Object)("inbound " + s))).then().doFinally(signalType -> logger.debug((Object)("Completed with: " + signalType)));
        }).block(Duration.ofMillis(5000L));
        Assertions.assertThat((Object)((CloseStatus)((Mono)closeStatus.get()).block(Duration.ofMillis(5000L)))).isEqualTo((Object)CloseStatus.create((int)4999, (String)"server-close"));
    }

    @Test
    public void clientClosing() throws Exception {
        this.client.execute(this.getUrl("/client-close"), session -> session.close(CloseStatus.create((int)4999, (String)"client-close"))).block(Duration.ofMillis(5000L));
        Assertions.assertThat((Object)((CloseStatus)serverCloseStatusSink.asMono().block(Duration.ofMillis(5000L)))).isEqualTo((Object)CloseStatus.create((int)4999, (String)"client-close"));
    }

    @Disabled
    @Test
    void cookie() throws Exception {
        AtomicReference cookie = new AtomicReference();
        AtomicReference receivedCookieRef = new AtomicReference();
        this.client.execute(this.getUrl("/cookie"), session -> {
            cookie.set(session.getHandshakeInfo().getHeaders().getFirst("Set-Cookie"));
            return session.receive().map(WebSocketMessage::getPayloadAsText).doOnNext(receivedCookieRef::set).doOnError(receivedCookieRef::set).then();
        }).block(TIMEOUT);
        Assertions.assertThat(receivedCookieRef.get()).isEqualTo((Object)"cookie");
        Assertions.assertThat((String)((String)cookie.get())).isEqualTo("project=spring");
    }

    @Configuration(proxyBeanMethods=false)
    @EnableAutoConfiguration
    @Import(value={PermitAllSecurityConfiguration.class})
    @LoadBalancerClient(name="wsservice", configuration={LocalLoadBalancerClientConfiguration.class})
    protected static class GatewayConfig {
        protected GatewayConfig() {
        }

        @Bean
        public RouteLocator wsRouteLocator(RouteLocatorBuilder builder) {
            return builder.routes().route(r -> r.path(new String[]{"/echoForHttp"}).uri("lb://wsservice")).route(r -> r.alwaysTrue().uri("lb:ws://wsservice")).build();
        }
    }

    @Configuration(proxyBeanMethods=false)
    static class WebSocketTestConfig {
        WebSocketTestConfig() {
        }

        @Bean
        public DispatcherHandler webHandler() {
            return new DispatcherHandler();
        }

        @Bean
        public WebSocketHandlerAdapter handlerAdapter() {
            return new WebSocketHandlerAdapter(this.webSocketService());
        }

        @Bean
        public WebSocketService webSocketService() {
            return new HandshakeWebSocketService(this.getUpgradeStrategy());
        }

        protected RequestUpgradeStrategy getUpgradeStrategy() {
            return new ReactorNettyRequestUpgradeStrategy();
        }

        @Bean
        public HandlerMapping handlerMapping() {
            HashMap<String, Object> map = new HashMap<String, Object>();
            map.put("/echo", new EchoWebSocketHandler());
            map.put("/echoForHttp", new EchoWebSocketHandler());
            map.put("/sub-protocol", new SubProtocolWebSocketHandler());
            map.put("/custom-header", new CustomHeaderHandler());
            map.put("/server-close", new ServerClosingHandler());
            map.put("/client-close", new ClientClosingHandler());
            map.put("/cookie", new CookieHandler());
            SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
            mapping.setUrlMap(map);
            return mapping;
        }

        @Bean
        public WebFilter cookieWebFilter() {
            return (exchange, chain) -> {
                if (exchange.getRequest().getPath().value().startsWith("/cookie")) {
                    exchange.getResponse().addCookie(ResponseCookie.from((String)"project", (String)"spring").build());
                }
                return chain.filter(exchange);
            };
        }
    }

    public static class LocalLoadBalancerClientConfiguration {
        @Value(value="${ws.server.port}")
        private int wsPort;

        @Bean
        public ServiceInstanceListSupplier staticServiceInstanceListSupplier(Environment env) {
            return ServiceInstanceListSuppliers.from((String)"wsservice", (ServiceInstance[])new ServiceInstance[]{new DefaultServiceInstance("wsservice-1", "wsservice", "localhost", this.wsPort, false)});
        }
    }

    private static class CookieHandler
    implements WebSocketHandler {
        private CookieHandler() {
        }

        public Mono<Void> handle(WebSocketSession session) {
            WebSocketMessage message = session.textMessage("cookie");
            return session.send((Publisher)Mono.just((Object)message));
        }
    }

    private static class ClientClosingHandler
    implements WebSocketHandler {
        private ClientClosingHandler() {
        }

        public Mono<Void> handle(WebSocketSession session) {
            return session.closeStatus().doOnNext(arg_0 -> serverCloseStatusSink.tryEmitValue(arg_0)).then();
        }
    }

    private static class ServerClosingHandler
    implements WebSocketHandler {
        private ServerClosingHandler() {
        }

        public Mono<Void> handle(WebSocketSession session) {
            return Flux.never().mergeWith((Publisher)session.close(CloseStatus.create((int)4999, (String)"server-close"))).then();
        }
    }

    private static class CustomHeaderHandler
    implements WebSocketHandler {
        private CustomHeaderHandler() {
        }

        public Mono<Void> handle(WebSocketSession session) {
            HttpHeaders headers = session.getHandshakeInfo().getHeaders();
            if (!headers.containsKey((Object)"my-header")) {
                return Mono.error((Throwable)new IllegalStateException("Missing my-header"));
            }
            String payload = "my-header:" + headers.getFirst("my-header");
            WebSocketMessage message = session.textMessage(payload);
            return session.send((Publisher)Mono.just((Object)message));
        }
    }

    private static class SubProtocolWebSocketHandler
    implements WebSocketHandler {
        private SubProtocolWebSocketHandler() {
        }

        public List<String> getSubProtocols() {
            return Arrays.asList("echo-v1", "echo-v2");
        }

        public Mono<Void> handle(WebSocketSession session) {
            String protocol = session.getHandshakeInfo().getSubProtocol();
            if (!StringUtils.hasText((String)protocol)) {
                return Mono.error((Throwable)new IllegalStateException("Missing protocol"));
            }
            List protocols = session.getHandshakeInfo().getHeaders().get((Object)"Sec-WebSocket-Protocol");
            Assertions.assertThat((List)protocols).contains((Object[])new String[]{"echo-v1,echo-v2"});
            WebSocketMessage message = session.textMessage(protocol != null ? protocol : "none");
            return session.send((Publisher)Mono.just((Object)message));
        }
    }

    private static class EchoWebSocketHandler
    implements WebSocketHandler {
        private EchoWebSocketHandler() {
        }

        public Mono<Void> handle(WebSocketSession session) {
            return session.send((Publisher)session.receive().doOnNext(WebSocketMessage::retain));
        }
    }
}

