package reactor.tcp.netty;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.SocketChannelConfig;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicInteger;
import javax.net.ssl.SSLEngine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Environment;
import reactor.core.Reactor;
import reactor.core.composable.Deferred;
import reactor.core.composable.Promise;
import reactor.core.composable.spec.Promises;
import reactor.core.spec.Reactors;
import reactor.event.dispatch.Dispatcher;
import reactor.function.Consumer;
import reactor.io.Buffer;
import reactor.support.NamedDaemonThreadFactory;
import reactor.tcp.TcpConnection;
import reactor.tcp.TcpServer;
import reactor.tcp.config.ServerSocketOptions;
import reactor.tcp.config.SslOptions;
import reactor.tcp.encoding.Codec;
import reactor.tcp.ssl.SSLEngineSupplier;
import reactor.util.Assert;

/* loaded from: input_file:reactor/tcp/netty/NettyTcpServer.class */
public class NettyTcpServer<IN, OUT> extends TcpServer<IN, OUT> {
    private final Logger log;
    private final ServerBootstrap bootstrap;
    private final Dispatcher eventsDispatcher;
    private final ServerSocketOptions options;
    private final EventLoopGroup selectorGroup;
    private final EventLoopGroup ioGroup;

    protected NettyTcpServer(Environment environment, Reactor reactor2, InetSocketAddress inetSocketAddress, ServerSocketOptions serverSocketOptions, final SslOptions sslOptions, Codec<Buffer, IN, OUT> codec, Collection<Consumer<TcpConnection<IN, OUT>>> collection) {
        super(environment, reactor2, inetSocketAddress, serverSocketOptions, sslOptions, codec, collection);
        this.log = LoggerFactory.getLogger(getClass());
        this.eventsDispatcher = reactor2.getDispatcher();
        Assert.notNull(serverSocketOptions, "ServerSocketOptions cannot be null");
        this.options = serverSocketOptions;
        int intValue = ((Integer) environment.getProperty("reactor.tcp.selectThreadCount", Integer.class, Integer.valueOf(Environment.PROCESSORS / 2))).intValue();
        int intValue2 = ((Integer) environment.getProperty("reactor.tcp.ioThreadCount", Integer.class, Integer.valueOf(Environment.PROCESSORS))).intValue();
        this.selectorGroup = new NioEventLoopGroup(intValue, new NamedDaemonThreadFactory("reactor-tcp-select"));
        this.ioGroup = new NioEventLoopGroup(intValue2, new NamedDaemonThreadFactory("reactor-tcp-io"));
        this.bootstrap = new ServerBootstrap().group(this.selectorGroup, this.ioGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, Integer.valueOf(this.options.backlog())).option(ChannelOption.SO_RCVBUF, Integer.valueOf(this.options.rcvbuf())).option(ChannelOption.SO_SNDBUF, Integer.valueOf(this.options.sndbuf())).option(ChannelOption.SO_REUSEADDR, Boolean.valueOf(this.options.reuseAddr())).localAddress(null == inetSocketAddress ? new InetSocketAddress(3000) : inetSocketAddress).childHandler(new ChannelInitializer<SocketChannel>() { // from class: reactor.tcp.netty.NettyTcpServer.1
            public void initChannel(final SocketChannel socketChannel) throws Exception {
                SocketChannelConfig config = socketChannel.config();
                config.setReceiveBufferSize(NettyTcpServer.this.options.rcvbuf());
                config.setSendBufferSize(NettyTcpServer.this.options.sndbuf());
                config.setKeepAlive(NettyTcpServer.this.options.keepAlive());
                config.setReuseAddress(NettyTcpServer.this.options.reuseAddr());
                config.setSoLinger(NettyTcpServer.this.options.linger());
                config.setTcpNoDelay(NettyTcpServer.this.options.tcpNoDelay());
                if (NettyTcpServer.this.log.isDebugEnabled()) {
                    NettyTcpServer.this.log.debug("CONNECT {}", socketChannel);
                }
                if (null != sslOptions) {
                    SSLEngine m12get = new SSLEngineSupplier(sslOptions, false).m12get();
                    if (NettyTcpServer.this.log.isDebugEnabled()) {
                        NettyTcpServer.this.log.debug("SSL enabled using keystore {}", null != sslOptions.keystoreFile() ? sslOptions.keystoreFile() : "<DEFAULT>");
                    }
                    socketChannel.pipeline().addLast(new ChannelHandler[]{new SslHandler(m12get)});
                }
                if ((NettyTcpServer.this.options instanceof NettyServerSocketOptions) && null != ((NettyServerSocketOptions) NettyTcpServer.this.options).pipelineConfigurer()) {
                    ((NettyServerSocketOptions) NettyTcpServer.this.options).pipelineConfigurer().accept(socketChannel.pipeline());
                }
                socketChannel.pipeline().addLast(NettyTcpServer.this.createChannelHandlers(socketChannel));
                socketChannel.closeFuture().addListener(new ChannelFutureListener() { // from class: reactor.tcp.netty.NettyTcpServer.1.1
                    public void operationComplete(ChannelFuture channelFuture) throws Exception {
                        if (NettyTcpServer.this.log.isDebugEnabled()) {
                            NettyTcpServer.this.log.debug("CLOSE {}", socketChannel);
                        }
                        NettyTcpServer.this.close(socketChannel);
                    }
                });
            }
        });
    }

    @Override // reactor.tcp.TcpServer
    public NettyTcpServer<IN, OUT> start(final Consumer<Void> consumer) {
        ChannelFuture bind = this.bootstrap.bind();
        if (null != consumer) {
            bind.addListener(new ChannelFutureListener() { // from class: reactor.tcp.netty.NettyTcpServer.2
                public void operationComplete(ChannelFuture channelFuture) throws Exception {
                    NettyTcpServer.this.log.info("BIND {}", channelFuture.channel().localAddress());
                    NettyTcpServer.this.notifyStart(consumer);
                }
            });
        }
        return this;
    }

    @Override // reactor.tcp.TcpServer
    public Promise<Void> shutdown() {
        final Deferred defer = Promises.defer(this.env, getReactor().getDispatcher());
        Reactors.schedule(new Consumer<Void>() { // from class: reactor.tcp.netty.NettyTcpServer.3
            public void accept(Void r6) {
                final AtomicInteger atomicInteger = new AtomicInteger(2);
                GenericFutureListener genericFutureListener = new GenericFutureListener() { // from class: reactor.tcp.netty.NettyTcpServer.3.1
                    public void operationComplete(Future future) throws Exception {
                        if (atomicInteger.decrementAndGet() == 0) {
                            NettyTcpServer.this.notifyShutdown();
                            defer.accept((Void) null);
                        }
                    }
                };
                NettyTcpServer.this.selectorGroup.shutdownGracefully().addListener(genericFutureListener);
                NettyTcpServer.this.ioGroup.shutdownGracefully().addListener(genericFutureListener);
            }
        }, (Object) null, getReactor());
        return defer.compose();
    }

    @Override // reactor.tcp.TcpServer
    protected <C> NettyTcpConnection<IN, OUT> createConnection(C c) {
        SocketChannel socketChannel = (SocketChannel) c;
        return new NettyTcpConnection<>(this.env, getCodec(), new NettyEventLoopDispatcher(socketChannel.eventLoop()), this.eventsDispatcher, socketChannel);
    }

    protected ChannelHandler[] createChannelHandlers(SocketChannel socketChannel) {
        return new ChannelHandler[]{new NettyTcpConnectionChannelInboundHandler((NettyTcpConnection) select(socketChannel))};
    }

    @Override // reactor.tcp.TcpServer
    protected /* bridge */ /* synthetic */ TcpConnection createConnection(Object obj) {
        return createConnection((NettyTcpServer<IN, OUT>) obj);
    }

    @Override // reactor.tcp.TcpServer
    public /* bridge */ /* synthetic */ TcpServer start(Consumer consumer) {
        return start((Consumer<Void>) consumer);
    }
}
