package org.springframework.integration.rsocket;

import io.rsocket.RSocketFactory;
import io.rsocket.transport.ServerTransport;
import io.rsocket.transport.netty.server.CloseableChannel;
import io.rsocket.transport.netty.server.TcpServerTransport;
import io.rsocket.transport.netty.server.WebsocketServerTransport;
import java.lang.reflect.Method;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.CompositeMessageCondition;
import org.springframework.messaging.handler.DestinationPatternsMessageCondition;
import org.springframework.messaging.handler.MessageCondition;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.messaging.rsocket.annotation.support.RSocketFrameTypeMessageCondition;
import org.springframework.util.Assert;
import org.springframework.util.ReflectionUtils;
import reactor.core.publisher.Mono;
import reactor.netty.http.server.HttpServer;

/* loaded from: input_file:org/springframework/integration/rsocket/ServerRSocketConnector.class */
public class ServerRSocketConnector extends AbstractRSocketConnector implements ApplicationEventPublisherAware {
    private final ServerTransport<CloseableChannel> serverTransport;
    private Consumer<RSocketFactory.ServerRSocketFactory> factoryConfigurer;
    private Mono<CloseableChannel> serverMono;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/springframework/integration/rsocket/ServerRSocketConnector$ServerRSocketMessageHandler.class */
    public static class ServerRSocketMessageHandler extends IntegrationRSocketMessageHandler {
        private static final Method HANDLE_CONNECTION_SETUP_METHOD = ReflectionUtils.findMethod(ServerRSocketMessageHandler.class, "handleConnectionSetup", new Class[]{Message.class});
        private final Map<Object, RSocketRequester> clientRSocketRequesters;
        private BiFunction<Map<String, Object>, DataBuffer, Object> clientRSocketKeyStrategy;
        private ApplicationEventPublisher applicationEventPublisher;

        private ServerRSocketMessageHandler() {
            this.clientRSocketRequesters = new HashMap();
            this.clientRSocketKeyStrategy = (map, dataBuffer) -> {
                return dataBuffer.toString(StandardCharsets.UTF_8);
            };
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void registerHandleConnectionSetupMethod() {
            registerHandlerMethod(this, HANDLE_CONNECTION_SETUP_METHOD, new CompositeMessageCondition(new MessageCondition[]{RSocketFrameTypeMessageCondition.CONNECT_CONDITION, new DestinationPatternsMessageCondition(new String[]{"*"}, obtainRouteMatcher())}));
        }

        private void handleConnectionSetup(Message<DataBuffer> message) {
            DataBuffer dataBuffer = (DataBuffer) message.getPayload();
            Map<String, Object> headers = message.getHeaders();
            Object apply = this.clientRSocketKeyStrategy.apply(headers, dataBuffer);
            RSocketRequester rSocketRequester = (RSocketRequester) headers.get("rsocketRequester", RSocketRequester.class);
            this.clientRSocketRequesters.put(apply, rSocketRequester);
            RSocketConnectedEvent rSocketConnectedEvent = new RSocketConnectedEvent(this, headers, dataBuffer, rSocketRequester);
            if (this.applicationEventPublisher != null) {
                this.applicationEventPublisher.publishEvent(rSocketConnectedEvent);
            } else if (this.logger.isInfoEnabled()) {
                this.logger.info("The RSocket has been connected: " + rSocketConnectedEvent);
            }
        }
    }

    public ServerRSocketConnector(String str, int i) {
        this((ServerTransport<CloseableChannel>) TcpServerTransport.create(str, i));
    }

    public ServerRSocketConnector(HttpServer httpServer) {
        this((ServerTransport<CloseableChannel>) WebsocketServerTransport.create(httpServer));
    }

    public ServerRSocketConnector(ServerTransport<CloseableChannel> serverTransport) {
        super(new ServerRSocketMessageHandler());
        this.factoryConfigurer = serverRSocketFactory -> {
        };
        Assert.notNull(serverTransport, "'serverTransport' must not be null");
        this.serverTransport = serverTransport;
    }

    public void setFactoryConfigurer(Consumer<RSocketFactory.ServerRSocketFactory> consumer) {
        Assert.notNull(consumer, "'factoryConfigurer' must not be null");
        this.factoryConfigurer = consumer;
    }

    public void setClientRSocketKeyStrategy(BiFunction<Map<String, Object>, DataBuffer, Object> biFunction) {
        Assert.notNull(biFunction, "'clientRSocketKeyStrategy' must not be null");
        serverRSocketMessageHandler().clientRSocketKeyStrategy = biFunction;
    }

    public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
        serverRSocketMessageHandler().applicationEventPublisher = applicationEventPublisher;
    }

    @Override // org.springframework.integration.rsocket.AbstractRSocketConnector
    public void afterPropertiesSet() {
        super.afterPropertiesSet();
        RSocketFactory.ServerRSocketFactory receive = RSocketFactory.receive();
        this.factoryConfigurer.accept(receive);
        this.serverMono = receive.acceptor(serverRSocketMessageHandler().responder()).transport(this.serverTransport).start().cache();
    }

    public Map<Object, RSocketRequester> getClientRSocketRequesters() {
        return Collections.unmodifiableMap(serverRSocketMessageHandler().clientRSocketRequesters);
    }

    @Nullable
    public RSocketRequester getClientRSocketRequester(Object obj) {
        return (RSocketRequester) serverRSocketMessageHandler().clientRSocketRequesters.get(obj);
    }

    public Mono<Integer> getBoundPort() {
        return this.serverMono.map(closeableChannel -> {
            return Integer.valueOf(closeableChannel.address().getPort());
        });
    }

    private ServerRSocketMessageHandler serverRSocketMessageHandler() {
        return (ServerRSocketMessageHandler) this.rSocketMessageHandler;
    }

    @Override // org.springframework.integration.rsocket.AbstractRSocketConnector
    protected void doStart() {
        this.serverMono.subscribe();
    }

    public void destroy() {
        this.serverMono.doOnNext((v0) -> {
            v0.dispose();
        }).subscribe();
    }

    @Override // org.springframework.integration.rsocket.AbstractRSocketConnector
    public void afterSingletonsInstantiated() {
        super.afterSingletonsInstantiated();
        serverRSocketMessageHandler().registerHandleConnectionSetupMethod();
    }
}
