package org.springframework.cloud.gateway.test.websocket;

import java.net.URI;
import java.net.URISyntaxException;
import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.assertj.core.api.AbstractStringAssert;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.reactivestreams.Publisher;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.cloud.client.DefaultServiceInstance;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.gateway.route.RouteLocator;
import org.springframework.cloud.gateway.route.builder.RouteLocatorBuilder;
import org.springframework.cloud.gateway.test.PermitAllSecurityConfiguration;
import org.springframework.cloud.gateway.test.support.HttpServer;
import org.springframework.cloud.gateway.test.support.ReactorHttpServer;
import org.springframework.cloud.loadbalancer.annotation.LoadBalancerClient;
import org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier;
import org.springframework.cloud.loadbalancer.support.ServiceInstanceListSuppliers;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.Lifecycle;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.core.env.ConfigurableEnvironment;
import org.springframework.core.env.Environment;
import org.springframework.http.HttpHeaders;
import org.springframework.http.server.reactive.HttpHandler;
import org.springframework.util.StringUtils;
import org.springframework.web.reactive.DispatcherHandler;
import org.springframework.web.reactive.HandlerMapping;
import org.springframework.web.reactive.handler.SimpleUrlHandlerMapping;
import org.springframework.web.reactive.socket.CloseStatus;
import org.springframework.web.reactive.socket.HandshakeInfo;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketSession;
import org.springframework.web.reactive.socket.client.ReactorNettyWebSocketClient;
import org.springframework.web.reactive.socket.client.WebSocketClient;
import org.springframework.web.reactive.socket.server.RequestUpgradeStrategy;
import org.springframework.web.reactive.socket.server.WebSocketService;
import org.springframework.web.reactive.socket.server.support.HandshakeWebSocketService;
import org.springframework.web.reactive.socket.server.support.WebSocketHandlerAdapter;
import org.springframework.web.reactive.socket.server.upgrade.ReactorNettyRequestUpgradeStrategy;
import org.springframework.web.server.adapter.WebHttpHandlerBuilder;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.ReplayProcessor;
import reactor.core.publisher.Sinks;

/* loaded from: input_file:org/springframework/cloud/gateway/test/websocket/WebSocketIntegrationTests.class */
public class WebSocketIntegrationTests {
    protected int serverPort;
    private WebSocketClient client;
    private HttpServer server;
    private ConfigurableApplicationContext gatewayContext;
    private int gatewayPort;
    private static final Log logger = LogFactory.getLog(WebSocketIntegrationTests.class);
    private static final Sinks.One<CloseStatus> serverCloseStatusSink = Sinks.one();

    /* loaded from: input_file:org/springframework/cloud/gateway/test/websocket/WebSocketIntegrationTests$ClientClosingHandler.class */
    private static class ClientClosingHandler implements WebSocketHandler {
        private ClientClosingHandler() {
        }

        public Mono<Void> handle(WebSocketSession webSocketSession) {
            Mono closeStatus = webSocketSession.closeStatus();
            Sinks.One one = WebSocketIntegrationTests.serverCloseStatusSink;
            one.getClass();
            return closeStatus.doOnNext((v1) -> {
                r1.tryEmitValue(v1);
            }).then();
        }
    }

    /* loaded from: input_file:org/springframework/cloud/gateway/test/websocket/WebSocketIntegrationTests$CustomHeaderHandler.class */
    private static class CustomHeaderHandler implements WebSocketHandler {
        private CustomHeaderHandler() {
        }

        public Mono<Void> handle(WebSocketSession webSocketSession) {
            HttpHeaders headers = webSocketSession.getHandshakeInfo().getHeaders();
            return !headers.containsKey("my-header") ? Mono.error(new IllegalStateException("Missing my-header")) : WebSocketIntegrationTests.doSend(webSocketSession, Mono.just(webSocketSession.textMessage("my-header:" + headers.getFirst("my-header"))));
        }
    }

    /* loaded from: input_file:org/springframework/cloud/gateway/test/websocket/WebSocketIntegrationTests$EchoWebSocketHandler.class */
    private static class EchoWebSocketHandler implements WebSocketHandler {
        private EchoWebSocketHandler() {
        }

        public Mono<Void> handle(WebSocketSession webSocketSession) {
            return webSocketSession.send(webSocketSession.receive().doOnNext((v0) -> {
                v0.retain();
            }));
        }
    }

    @Configuration(proxyBeanMethods = false)
    @EnableAutoConfiguration
    @LoadBalancerClient(name = "wsservice", configuration = {LocalLoadBalancerClientConfiguration.class})
    @Import({PermitAllSecurityConfiguration.class})
    /* loaded from: input_file:org/springframework/cloud/gateway/test/websocket/WebSocketIntegrationTests$GatewayConfig.class */
    protected static class GatewayConfig {
        protected GatewayConfig() {
        }

        @Bean
        public RouteLocator wsRouteLocator(RouteLocatorBuilder routeLocatorBuilder) {
            return routeLocatorBuilder.routes().route(predicateSpec -> {
                return predicateSpec.path(new String[]{"/echoForHttp"}).uri("lb://wsservice");
            }).route(predicateSpec2 -> {
                return predicateSpec2.alwaysTrue().uri("lb:ws://wsservice");
            }).build();
        }
    }

    /* loaded from: input_file:org/springframework/cloud/gateway/test/websocket/WebSocketIntegrationTests$LocalLoadBalancerClientConfiguration.class */
    public static class LocalLoadBalancerClientConfiguration {

        @Value("${ws.server.port}")
        private int wsPort;

        @Bean
        public ServiceInstanceListSupplier staticServiceInstanceListSupplier(Environment environment) {
            return ServiceInstanceListSuppliers.from("wsservice", new ServiceInstance[]{new DefaultServiceInstance("wsservice-1", "wsservice", "localhost", this.wsPort, false)});
        }
    }

    /* loaded from: input_file:org/springframework/cloud/gateway/test/websocket/WebSocketIntegrationTests$ServerClosingHandler.class */
    private static class ServerClosingHandler implements WebSocketHandler {
        private ServerClosingHandler() {
        }

        public Mono<Void> handle(WebSocketSession webSocketSession) {
            return Flux.never().mergeWith(webSocketSession.close(CloseStatus.create(4999, "server-close"))).then();
        }
    }

    /* loaded from: input_file:org/springframework/cloud/gateway/test/websocket/WebSocketIntegrationTests$SubProtocolWebSocketHandler.class */
    private static class SubProtocolWebSocketHandler implements WebSocketHandler {
        private SubProtocolWebSocketHandler() {
        }

        public List<String> getSubProtocols() {
            return Arrays.asList("echo-v1", "echo-v2");
        }

        public Mono<Void> handle(WebSocketSession webSocketSession) {
            String subProtocol = webSocketSession.getHandshakeInfo().getSubProtocol();
            if (!StringUtils.hasText(subProtocol)) {
                return Mono.error(new IllegalStateException("Missing protocol"));
            }
            Assertions.assertThat(webSocketSession.getHandshakeInfo().getHeaders().get("Sec-WebSocket-Protocol")).contains(new String[]{"echo-v1,echo-v2"});
            return WebSocketIntegrationTests.doSend(webSocketSession, Mono.just(webSocketSession.textMessage(subProtocol)));
        }
    }

    @Configuration(proxyBeanMethods = false)
    /* loaded from: input_file:org/springframework/cloud/gateway/test/websocket/WebSocketIntegrationTests$WebSocketTestConfig.class */
    static class WebSocketTestConfig {
        WebSocketTestConfig() {
        }

        @Bean
        public DispatcherHandler webHandler() {
            return new DispatcherHandler();
        }

        @Bean
        public WebSocketHandlerAdapter handlerAdapter() {
            return new WebSocketHandlerAdapter(webSocketService());
        }

        @Bean
        public WebSocketService webSocketService() {
            return new HandshakeWebSocketService(getUpgradeStrategy());
        }

        protected RequestUpgradeStrategy getUpgradeStrategy() {
            return new ReactorNettyRequestUpgradeStrategy();
        }

        @Bean
        public HandlerMapping handlerMapping() {
            HashMap hashMap = new HashMap();
            hashMap.put("/echo", new EchoWebSocketHandler());
            hashMap.put("/echoForHttp", new EchoWebSocketHandler());
            hashMap.put("/sub-protocol", new SubProtocolWebSocketHandler());
            hashMap.put("/custom-header", new CustomHeaderHandler());
            hashMap.put("/server-close", new ServerClosingHandler());
            hashMap.put("/client-close", new ClientClosingHandler());
            SimpleUrlHandlerMapping simpleUrlHandlerMapping = new SimpleUrlHandlerMapping();
            simpleUrlHandlerMapping.setUrlMap(hashMap);
            return simpleUrlHandlerMapping;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static Mono<Void> doSend(WebSocketSession webSocketSession, Publisher<WebSocketMessage> publisher) {
        return webSocketSession.send(publisher);
    }

    @Before
    public void setup() throws Exception {
        this.client = new ReactorNettyWebSocketClient();
        this.server = new ReactorHttpServer();
        this.server.setHandler(createHttpHandler());
        this.server.afterPropertiesSet();
        this.server.start();
        this.serverPort = this.server.getPort();
        if (this.client instanceof Lifecycle) {
            this.client.start();
        }
        this.gatewayContext = new SpringApplicationBuilder(new Class[]{GatewayConfig.class}).properties(new String[]{"ws.server.port:" + this.serverPort, "server.port=0", "spring.jmx.enabled=false"}).run(new String[0]);
        this.gatewayPort = Integer.valueOf(((ConfigurableEnvironment) this.gatewayContext.getBean(ConfigurableEnvironment.class)).getProperty("local.server.port")).intValue();
    }

    @After
    public void stop() throws Exception {
        if (this.client instanceof Lifecycle) {
            this.client.stop();
        }
        this.server.stop();
        if (this.gatewayContext != null) {
            this.gatewayContext.stop();
        }
    }

    private HttpHandler createHttpHandler() {
        AnnotationConfigApplicationContext annotationConfigApplicationContext = new AnnotationConfigApplicationContext();
        annotationConfigApplicationContext.register(new Class[]{WebSocketTestConfig.class});
        annotationConfigApplicationContext.refresh();
        return WebHttpHandlerBuilder.applicationContext(annotationConfigApplicationContext).build();
    }

    protected URI getUrl(String str) throws URISyntaxException {
        return new URI("ws://localhost:" + this.gatewayPort + str);
    }

    protected URI getHttpUrl(String str) throws URISyntaxException {
        return new URI("http://localhost:" + this.gatewayPort + str);
    }

    @Test
    public void echo() throws Exception {
        int i = 100;
        Flux map = Flux.range(1, 100).map(num -> {
            return "msg-" + num;
        });
        ReplayProcessor create = ReplayProcessor.create(100);
        this.client.execute(getUrl("/echo"), webSocketSession -> {
            logger.debug("Starting to send messages");
            return webSocketSession.send(map.doOnNext(str -> {
                logger.debug("outbound " + str);
            }).map(str2 -> {
                return webSocketSession.textMessage(str2);
            })).thenMany(webSocketSession.receive().take(i).map((v0) -> {
                return v0.getPayloadAsText();
            })).subscribeWith(create).doOnNext(obj -> {
                logger.debug("inbound " + obj);
            }).then().doOnSuccess(r3 -> {
                logger.debug("Done with success");
            }).doOnError(th -> {
                logger.debug("Done with " + (th != null ? th.getMessage() : "error"));
            });
        }).block(Duration.ofMillis(5000L));
        Assertions.assertThat((List) create.collectList().block(Duration.ofMillis(5000L))).isEqualTo(map.collectList().block(Duration.ofMillis(5000L)));
    }

    @Test
    public void echoForHttp() throws Exception {
        int i = 100;
        Flux map = Flux.range(1, 100).map(num -> {
            return "msg-" + num;
        });
        ReplayProcessor create = ReplayProcessor.create(100);
        this.client.execute(getHttpUrl("/echoForHttp"), webSocketSession -> {
            logger.debug("Starting to send messages");
            return webSocketSession.send(map.doOnNext(str -> {
                logger.debug("outbound " + str);
            }).map(str2 -> {
                return webSocketSession.textMessage(str2);
            })).thenMany(webSocketSession.receive().take(i).map((v0) -> {
                return v0.getPayloadAsText();
            })).subscribeWith(create).doOnNext(obj -> {
                logger.debug("inbound " + obj);
            }).then().doOnSuccess(r3 -> {
                logger.debug("Done with success");
            }).doOnError(th -> {
                logger.debug("Done with " + (th != null ? th.getMessage() : "error"));
            });
        }).block(Duration.ofMillis(5000L));
        Assertions.assertThat((List) create.collectList().block(Duration.ofMillis(5000L))).isEqualTo(map.collectList().block(Duration.ofMillis(5000L)));
    }

    @Test
    @Ignore
    public void subProtocol() throws Exception {
        final String str = "echo-v1";
        final String str2 = "echo-v2";
        final AtomicReference atomicReference = new AtomicReference();
        final MonoProcessor create = MonoProcessor.create();
        this.client.execute(getUrl("/sub-protocol"), new WebSocketHandler() { // from class: org.springframework.cloud.gateway.test.websocket.WebSocketIntegrationTests.1
            public List<String> getSubProtocols() {
                return Arrays.asList(str, str2);
            }

            public Mono<Void> handle(WebSocketSession webSocketSession) {
                atomicReference.set(webSocketSession.getHandshakeInfo());
                return webSocketSession.receive().map((v0) -> {
                    return v0.getPayloadAsText();
                }).subscribeWith(create).then();
            }
        }).block(Duration.ofMillis(5000L));
        HandshakeInfo handshakeInfo = (HandshakeInfo) atomicReference.get();
        Assertions.assertThat(handshakeInfo.getHeaders().getFirst("Upgrade")).isEqualToIgnoringCase("websocket");
        Assertions.assertThat(handshakeInfo.getHeaders().getFirst("Sec-WebSocket-Protocol")).isEqualTo("echo-v1");
        ((AbstractStringAssert) Assertions.assertThat(handshakeInfo.getSubProtocol()).as("Wrong protocol accepted", new Object[0])).isEqualTo("echo-v1");
        Assertions.assertThat(create.block(Duration.ofSeconds(5L))).as("Wrong protocol detected on the server side", new Object[0]).isEqualTo("echo-v1");
    }

    @Test
    @Ignore
    public void customHeader() throws Exception {
        HttpHeaders httpHeaders = new HttpHeaders();
        httpHeaders.add("my-header", "my-value");
        MonoProcessor create = MonoProcessor.create();
        this.client.execute(getUrl("/custom-header"), httpHeaders, webSocketSession -> {
            return webSocketSession.receive().map((v0) -> {
                return v0.getPayloadAsText();
            }).subscribeWith(create).then();
        }).block(Duration.ofMillis(5000L));
        Assertions.assertThat(create.block(Duration.ofMillis(5000L))).isEqualTo("my-header:my-value");
    }

    @Test
    public void serverClosing() throws Exception {
        AtomicReference atomicReference = new AtomicReference();
        this.client.execute(getUrl("/server-close"), webSocketSession -> {
            logger.debug("Starting..");
            atomicReference.set(webSocketSession.closeStatus());
            return webSocketSession.receive().doOnNext(webSocketMessage -> {
                logger.debug("inbound " + webSocketMessage);
            }).then().doFinally(signalType -> {
                logger.debug("Completed with: " + signalType);
            });
        }).block(Duration.ofMillis(5000L));
        Assertions.assertThat(((Mono) atomicReference.get()).block(Duration.ofMillis(5000L))).isEqualTo(CloseStatus.create(4999, "server-close"));
    }

    @Test
    public void clientClosing() throws Exception {
        this.client.execute(getUrl("/client-close"), webSocketSession -> {
            return webSocketSession.close(CloseStatus.create(4999, "client-close"));
        }).block(Duration.ofMillis(5000L));
        Assertions.assertThat(serverCloseStatusSink.asMono().block(Duration.ofMillis(5000L))).isEqualTo(CloseStatus.create(4999, "client-close"));
    }
}
