/*
 * Decompiled with CFR 0.152.
 */
package reactor.tcp.netty;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.SocketChannelConfig;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Collection;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import javax.net.ssl.SSLEngine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Environment;
import reactor.core.Observable;
import reactor.core.Reactor;
import reactor.core.composable.Deferred;
import reactor.core.composable.Promise;
import reactor.core.composable.spec.Promises;
import reactor.core.spec.Reactors;
import reactor.event.dispatch.Dispatcher;
import reactor.function.Consumer;
import reactor.io.Buffer;
import reactor.support.NamedDaemonThreadFactory;
import reactor.tcp.TcpConnection;
import reactor.tcp.TcpServer;
import reactor.tcp.config.ServerSocketOptions;
import reactor.tcp.config.SslOptions;
import reactor.tcp.encoding.Codec;
import reactor.tcp.netty.NettyEventLoopDispatcher;
import reactor.tcp.netty.NettyServerSocketOptions;
import reactor.tcp.netty.NettyTcpConnection;
import reactor.tcp.netty.NettyTcpConnectionChannelInboundHandler;
import reactor.tcp.ssl.SSLEngineSupplier;
import reactor.util.Assert;

public class NettyTcpServer<IN, OUT>
extends TcpServer<IN, OUT> {
    private final Logger log = LoggerFactory.getLogger(this.getClass());
    private final ServerBootstrap bootstrap;
    private final Dispatcher eventsDispatcher;
    private final ServerSocketOptions options;
    private final EventLoopGroup selectorGroup;
    private final EventLoopGroup ioGroup;

    protected NettyTcpServer(Environment env, Reactor reactor, InetSocketAddress listenAddress, ServerSocketOptions opts, final SslOptions sslOpts, Codec<Buffer, IN, OUT> codec, Collection<Consumer<TcpConnection<IN, OUT>>> connectionConsumers) {
        super(env, reactor, listenAddress, opts, sslOpts, codec, connectionConsumers);
        this.eventsDispatcher = reactor.getDispatcher();
        Assert.notNull((Object)opts, (String)"ServerSocketOptions cannot be null");
        this.options = opts;
        int selectThreadCount = (Integer)env.getProperty("reactor.tcp.selectThreadCount", Integer.class, (Object)(Environment.PROCESSORS / 2));
        int ioThreadCount = (Integer)env.getProperty("reactor.tcp.ioThreadCount", Integer.class, (Object)Environment.PROCESSORS);
        this.selectorGroup = new NioEventLoopGroup(selectThreadCount, (ThreadFactory)new NamedDaemonThreadFactory("reactor-tcp-select"));
        this.ioGroup = new NioEventLoopGroup(ioThreadCount, (ThreadFactory)new NamedDaemonThreadFactory("reactor-tcp-io"));
        this.bootstrap = ((ServerBootstrap)((ServerBootstrap)((ServerBootstrap)((ServerBootstrap)((ServerBootstrap)((ServerBootstrap)new ServerBootstrap().group(this.selectorGroup, this.ioGroup).channel(NioServerSocketChannel.class)).option(ChannelOption.SO_BACKLOG, (Object)this.options.backlog())).option(ChannelOption.SO_RCVBUF, (Object)this.options.rcvbuf())).option(ChannelOption.SO_SNDBUF, (Object)this.options.sndbuf())).option(ChannelOption.SO_REUSEADDR, (Object)this.options.reuseAddr())).localAddress((SocketAddress)(null == listenAddress ? new InetSocketAddress(3000) : listenAddress))).childHandler((ChannelHandler)new ChannelInitializer<SocketChannel>(){

            public void initChannel(final SocketChannel ch) throws Exception {
                SocketChannelConfig config = ch.config();
                config.setReceiveBufferSize(NettyTcpServer.this.options.rcvbuf());
                config.setSendBufferSize(NettyTcpServer.this.options.sndbuf());
                config.setKeepAlive(NettyTcpServer.this.options.keepAlive());
                config.setReuseAddress(NettyTcpServer.this.options.reuseAddr());
                config.setSoLinger(NettyTcpServer.this.options.linger());
                config.setTcpNoDelay(NettyTcpServer.this.options.tcpNoDelay());
                if (NettyTcpServer.this.log.isDebugEnabled()) {
                    NettyTcpServer.this.log.debug("CONNECT {}", (Object)ch);
                }
                if (null != sslOpts) {
                    SSLEngine ssl = new SSLEngineSupplier(sslOpts, false).get();
                    if (NettyTcpServer.this.log.isDebugEnabled()) {
                        NettyTcpServer.this.log.debug("SSL enabled using keystore {}", (Object)(null != sslOpts.keystoreFile() ? sslOpts.keystoreFile() : "<DEFAULT>"));
                    }
                    ch.pipeline().addLast(new ChannelHandler[]{new SslHandler(ssl)});
                }
                if (NettyTcpServer.this.options instanceof NettyServerSocketOptions && null != ((NettyServerSocketOptions)NettyTcpServer.this.options).pipelineConfigurer()) {
                    ((NettyServerSocketOptions)NettyTcpServer.this.options).pipelineConfigurer().accept((Object)ch.pipeline());
                }
                ch.pipeline().addLast(NettyTcpServer.this.createChannelHandlers(ch));
                ch.closeFuture().addListener((GenericFutureListener)new ChannelFutureListener(){

                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (NettyTcpServer.this.log.isDebugEnabled()) {
                            NettyTcpServer.this.log.debug("CLOSE {}", (Object)ch);
                        }
                        NettyTcpServer.this.close(ch);
                    }
                });
            }
        });
    }

    @Override
    public NettyTcpServer<IN, OUT> start(final Consumer<Void> started) {
        ChannelFuture bindFuture = this.bootstrap.bind();
        if (null != started) {
            bindFuture.addListener((GenericFutureListener)new ChannelFutureListener(){

                public void operationComplete(ChannelFuture future) throws Exception {
                    NettyTcpServer.this.log.info("BIND {}", (Object)future.channel().localAddress());
                    NettyTcpServer.this.notifyStart((Consumer<Void>)started);
                }
            });
        }
        return this;
    }

    @Override
    public Promise<Void> shutdown() {
        final Deferred d = Promises.defer((Environment)this.env, (Dispatcher)this.getReactor().getDispatcher());
        Reactors.schedule((Consumer)new Consumer<Void>(){

            public void accept(Void v) {
                final AtomicInteger groupsToShutdown = new AtomicInteger(2);
                GenericFutureListener listener = new GenericFutureListener(){

                    public void operationComplete(Future future) throws Exception {
                        if (groupsToShutdown.decrementAndGet() == 0) {
                            NettyTcpServer.this.notifyShutdown();
                            d.accept((Object)null);
                        }
                    }
                };
                NettyTcpServer.this.selectorGroup.shutdownGracefully().addListener(listener);
                NettyTcpServer.this.ioGroup.shutdownGracefully().addListener(listener);
            }
        }, null, (Observable)this.getReactor());
        return (Promise)d.compose();
    }

    @Override
    protected <C> NettyTcpConnection<IN, OUT> createConnection(C channel) {
        SocketChannel ch = (SocketChannel)channel;
        return new NettyTcpConnection(this.env, this.getCodec(), (Dispatcher)new NettyEventLoopDispatcher(ch.eventLoop()), this.eventsDispatcher, ch);
    }

    protected ChannelHandler[] createChannelHandlers(SocketChannel ch) {
        NettyTcpConnection conn = (NettyTcpConnection)this.select(ch);
        return new ChannelHandler[]{new NettyTcpConnectionChannelInboundHandler(conn)};
    }
}

