/*
 * Decompiled with CFR 0.152.
 */
package reactor.io.net.impl.netty.udp;

import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ChannelFactory;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.DatagramPacket;
import io.netty.channel.socket.InternetProtocolFamily;
import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.util.NetUtil;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.NetworkInterface;
import java.net.ProtocolFamily;
import java.net.SocketAddress;
import java.util.concurrent.ThreadFactory;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.Environment;
import reactor.core.Dispatcher;
import reactor.core.support.NamedDaemonThreadFactory;
import reactor.io.buffer.Buffer;
import reactor.io.codec.Codec;
import reactor.io.net.ChannelStream;
import reactor.io.net.ReactorChannelHandler;
import reactor.io.net.config.ServerSocketOptions;
import reactor.io.net.impl.netty.NettyChannelHandlerBridge;
import reactor.io.net.impl.netty.NettyChannelStream;
import reactor.io.net.impl.netty.NettyServerSocketOptions;
import reactor.io.net.udp.DatagramServer;
import reactor.rx.Promise;
import reactor.rx.Promises;

public class NettyDatagramServer<IN, OUT>
extends DatagramServer<IN, OUT> {
    private static final Logger log = LoggerFactory.getLogger(NettyDatagramServer.class);
    private final NettyServerSocketOptions nettyOptions;
    private final Bootstrap bootstrap;
    private final EventLoopGroup ioGroup;
    private volatile NioDatagramChannel channel;

    public NettyDatagramServer(@Nonnull Environment env, @Nonnull Dispatcher dispatcher, @Nullable InetSocketAddress listenAddress, @Nullable NetworkInterface multicastInterface, @Nonnull ServerSocketOptions options, @Nullable Codec<Buffer, IN, OUT> codec) {
        super(env, dispatcher, listenAddress, multicastInterface, options, codec);
        this.nettyOptions = options instanceof NettyServerSocketOptions ? (NettyServerSocketOptions)options : null;
        if (null != this.nettyOptions && null != this.nettyOptions.eventLoopGroup()) {
            this.ioGroup = this.nettyOptions.eventLoopGroup();
        } else {
            int ioThreadCount = this.getDefaultEnvironment().getIntProperty("reactor.udp.ioThreadCount", Environment.PROCESSORS);
            this.ioGroup = new NioEventLoopGroup(ioThreadCount, (ThreadFactory)new NamedDaemonThreadFactory("reactor-udp-io"));
        }
        final InternetProtocolFamily family = this.toNettyFamily(options.protocolFamily());
        this.bootstrap = (Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)new Bootstrap().group(this.ioGroup)).option(ChannelOption.SO_RCVBUF, (Object)options.rcvbuf())).option(ChannelOption.SO_SNDBUF, (Object)options.sndbuf())).option(ChannelOption.SO_REUSEADDR, (Object)options.reuseAddr())).option(ChannelOption.AUTO_READ, (Object)false)).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (Object)options.timeout())).channelFactory((ChannelFactory)new ChannelFactory<Channel>(){

            public Channel newChannel() {
                return new NioDatagramChannel(family);
            }
        });
        if (null != listenAddress) {
            this.bootstrap.localAddress((SocketAddress)listenAddress);
        } else {
            this.bootstrap.localAddress(NetUtil.LOCALHOST, 3000);
        }
        if (null != multicastInterface) {
            this.bootstrap.option(ChannelOption.IP_MULTICAST_IF, (Object)multicastInterface);
        }
    }

    private InternetProtocolFamily toNettyFamily(ProtocolFamily family) {
        if (family == null) {
            return null;
        }
        switch (family.name()) {
            case "INET": {
                return InternetProtocolFamily.IPv4;
            }
            case "INET6": {
                return InternetProtocolFamily.IPv6;
            }
        }
        throw new IllegalArgumentException("Unsupported protocolFamily: " + family.name());
    }

    @Override
    protected Promise<Void> doStart(final ReactorChannelHandler<IN, OUT, ChannelStream<IN, OUT>> channelHandler) {
        final Promise promise = Promises.ready((Environment)this.getDefaultEnvironment(), (Dispatcher)this.getDefaultDispatcher());
        ChannelFuture future = ((Bootstrap)this.bootstrap.handler((ChannelHandler)new ChannelInitializer<NioDatagramChannel>(){

            public void initChannel(NioDatagramChannel ch) throws Exception {
                if (null != NettyDatagramServer.this.nettyOptions && null != NettyDatagramServer.this.nettyOptions.pipelineConfigurer()) {
                    NettyDatagramServer.this.nettyOptions.pipelineConfigurer().accept((Object)ch.pipeline());
                }
                NettyDatagramServer.this.bindChannel(channelHandler, ch);
            }
        })).bind();
        future.addListener((GenericFutureListener)new ChannelFutureListener(){

            public void operationComplete(ChannelFuture future) throws Exception {
                if (future.isSuccess()) {
                    log.info("BIND {}", (Object)future.channel().localAddress());
                    NettyDatagramServer.this.channel = (NioDatagramChannel)future.channel();
                    promise.onComplete();
                } else {
                    promise.onError(future.cause());
                }
            }
        });
        return promise;
    }

    @Override
    protected Promise<Void> doShutdown() {
        final Promise d = Promises.prepare();
        ChannelFuture future = this.channel.close();
        final GenericFutureListener listener = new GenericFutureListener(){

            public void operationComplete(Future future) throws Exception {
                if (future.isSuccess()) {
                    d.onComplete();
                } else {
                    d.onError(future.cause());
                }
            }
        };
        future.addListener((GenericFutureListener)new ChannelFutureListener(){

            public void operationComplete(ChannelFuture future) throws Exception {
                if (!future.isSuccess()) {
                    d.onError(future.cause());
                    return;
                }
                if (null == NettyDatagramServer.this.nettyOptions || null == NettyDatagramServer.this.nettyOptions.eventLoopGroup()) {
                    NettyDatagramServer.this.ioGroup.shutdownGracefully().addListener(listener);
                }
            }
        });
        return d;
    }

    @Override
    public Promise<Void> join(final InetAddress multicastAddress, NetworkInterface iface) {
        if (null == this.channel) {
            throw new IllegalStateException("DatagramServer not running.");
        }
        final Promise d = Promises.ready((Environment)this.getDefaultEnvironment(), (Dispatcher)this.getDefaultDispatcher());
        if (null == iface && null != this.getMulticastInterface()) {
            iface = this.getMulticastInterface();
        }
        ChannelFuture future = null != iface ? this.channel.joinGroup(new InetSocketAddress(multicastAddress, this.getListenAddress().getPort()), iface) : this.channel.joinGroup(multicastAddress);
        future.addListener((GenericFutureListener)new ChannelFutureListener(){

            public void operationComplete(ChannelFuture future) throws Exception {
                log.info("JOIN {}", (Object)multicastAddress);
                if (future.isSuccess()) {
                    d.onComplete();
                } else {
                    d.onError(future.cause());
                }
            }
        });
        return d;
    }

    @Override
    public Promise<Void> leave(final InetAddress multicastAddress, NetworkInterface iface) {
        if (null == this.channel) {
            throw new IllegalStateException("DatagramServer not running.");
        }
        if (null == iface && null != this.getMulticastInterface()) {
            iface = this.getMulticastInterface();
        }
        final Promise d = Promises.ready((Environment)this.getDefaultEnvironment(), (Dispatcher)this.getDefaultDispatcher());
        ChannelFuture future = null != iface ? this.channel.leaveGroup(new InetSocketAddress(multicastAddress, this.getListenAddress().getPort()), iface) : this.channel.leaveGroup(multicastAddress);
        future.addListener((GenericFutureListener)new ChannelFutureListener(){

            public void operationComplete(ChannelFuture future) throws Exception {
                log.info("LEAVE {}", (Object)multicastAddress);
                if (future.isSuccess()) {
                    d.onComplete();
                } else {
                    d.onError(future.cause());
                }
            }
        });
        return d;
    }

    protected void bindChannel(ReactorChannelHandler<IN, OUT, ChannelStream<IN, OUT>> handler, Object _ioChannel) {
        NioDatagramChannel ioChannel = (NioDatagramChannel)_ioChannel;
        NettyChannelStream netChannel = new NettyChannelStream(this.getDefaultEnvironment(), this.getDefaultCodec(), this.getDefaultPrefetchSize(), this.getDefaultDispatcher(), (Channel)ioChannel);
        ChannelPipeline pipeline = ioChannel.pipeline();
        if (log.isDebugEnabled()) {
            pipeline.addLast(new ChannelHandler[]{new LoggingHandler(NettyDatagramServer.class)});
        }
        pipeline.addLast(new ChannelHandler[]{new NettyChannelHandlerBridge<IN, OUT>(handler, netChannel){

            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                if (msg != null && DatagramPacket.class.isAssignableFrom(msg.getClass())) {
                    super.channelRead(ctx, ((DatagramPacket)msg).content());
                } else {
                    super.channelRead(ctx, msg);
                }
            }
        }, new ChannelOutboundHandlerAdapter(){

            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                super.write(ctx, msg, promise);
            }
        }});
    }
}

