/*
 * Decompiled with CFR 0.152.
 */
package reactor.net.netty.tcp;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.GenericFutureListener;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Collection;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.net.ssl.SSLEngine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Environment;
import reactor.core.Reactor;
import reactor.core.composable.Deferred;
import reactor.core.composable.Promise;
import reactor.core.composable.Stream;
import reactor.core.composable.spec.Promises;
import reactor.core.composable.spec.Streams;
import reactor.event.dispatch.Dispatcher;
import reactor.function.Consumer;
import reactor.function.Supplier;
import reactor.io.Buffer;
import reactor.io.encoding.Codec;
import reactor.net.AbstractNetChannel;
import reactor.net.NetChannel;
import reactor.net.Reconnect;
import reactor.net.config.ClientSocketOptions;
import reactor.net.config.SslOptions;
import reactor.net.netty.NettyClientSocketOptions;
import reactor.net.netty.NettyEventLoopDispatcher;
import reactor.net.netty.NettyNetChannel;
import reactor.net.netty.NettyNetChannelInboundHandler;
import reactor.net.netty.NettyNetChannelOutboundHandler;
import reactor.net.tcp.TcpClient;
import reactor.net.tcp.ssl.SSLEngineSupplier;
import reactor.support.NamedDaemonThreadFactory;
import reactor.tuple.Tuple2;

public class NettyTcpClient<IN, OUT>
extends TcpClient<IN, OUT> {
    private final Logger log = LoggerFactory.getLogger(NettyTcpClient.class);
    private final NettyClientSocketOptions nettyOptions;
    private final Bootstrap bootstrap;
    private final EventLoopGroup ioGroup;
    private final Supplier<ChannelFuture> connectionSupplier;
    private volatile InetSocketAddress connectAddress;
    private volatile boolean closing;

    public NettyTcpClient(@Nonnull Environment env, @Nonnull Reactor reactor, @Nonnull InetSocketAddress connectAddress, final @Nonnull ClientSocketOptions options, final @Nullable SslOptions sslOptions, @Nullable Codec<Buffer, IN, OUT> codec, @Nonnull Collection<Consumer<NetChannel<IN, OUT>>> consumers) {
        super(env, reactor, connectAddress, options, sslOptions, codec, consumers);
        this.connectAddress = connectAddress;
        this.nettyOptions = options instanceof NettyClientSocketOptions ? (NettyClientSocketOptions)options : null;
        if (null != this.nettyOptions && null != this.nettyOptions.eventLoopGroup()) {
            this.ioGroup = this.nettyOptions.eventLoopGroup();
        } else {
            int ioThreadCount = (Integer)env.getProperty("reactor.tcp.ioThreadCount", Integer.class, (Object)Environment.PROCESSORS);
            this.ioGroup = new NioEventLoopGroup(ioThreadCount, (ThreadFactory)new NamedDaemonThreadFactory("reactor-tcp-io"));
        }
        this.bootstrap = (Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)new Bootstrap().group(this.ioGroup)).channel(NioSocketChannel.class)).option(ChannelOption.SO_RCVBUF, (Object)options.rcvbuf())).option(ChannelOption.SO_SNDBUF, (Object)options.sndbuf())).option(ChannelOption.SO_KEEPALIVE, (Object)options.keepAlive())).option(ChannelOption.SO_LINGER, (Object)options.linger())).option(ChannelOption.TCP_NODELAY, (Object)options.tcpNoDelay())).remoteAddress((SocketAddress)this.connectAddress).handler((ChannelHandler)new ChannelInitializer<SocketChannel>(){

            public void initChannel(SocketChannel ch) throws Exception {
                ch.config().setConnectTimeoutMillis(options.timeout());
                if (null != sslOptions) {
                    SSLEngine ssl = new SSLEngineSupplier(sslOptions, true).get();
                    if (NettyTcpClient.this.log.isDebugEnabled()) {
                        NettyTcpClient.this.log.debug("SSL enabled using keystore {}", (Object)(null != sslOptions.keystoreFile() ? sslOptions.keystoreFile() : "<DEFAULT>"));
                    }
                    ch.pipeline().addLast(new ChannelHandler[]{new SslHandler(ssl)});
                }
                if (null != NettyTcpClient.this.nettyOptions && null != NettyTcpClient.this.nettyOptions.pipelineConfigurer()) {
                    NettyTcpClient.this.nettyOptions.pipelineConfigurer().accept((Object)ch.pipeline());
                }
                ch.pipeline().addLast(NettyTcpClient.this.createChannelHandlers(ch));
            }
        });
        this.connectionSupplier = new Supplier<ChannelFuture>(){

            public ChannelFuture get() {
                if (!NettyTcpClient.this.closing) {
                    return NettyTcpClient.this.bootstrap.connect((SocketAddress)NettyTcpClient.this.getConnectAddress());
                }
                return null;
            }
        };
    }

    @Override
    public Promise<NetChannel<IN, OUT>> open() {
        Deferred connection = Promises.defer((Environment)this.getEnvironment(), (Dispatcher)this.getReactor().getDispatcher());
        this.openChannel(new ConnectingChannelListener(connection));
        return (Promise)connection.compose();
    }

    @Override
    public Stream<NetChannel<IN, OUT>> open(Reconnect reconnect) {
        Deferred connections = Streams.defer((Environment)this.getEnvironment(), (Dispatcher)this.getReactor().getDispatcher());
        this.openChannel(new ReconnectingChannelListener(this.connectAddress, reconnect, connections));
        return (Stream)connections.compose();
    }

    @Override
    public void close(final @Nullable Consumer<Boolean> onClose) {
        if (null != this.nettyOptions && null != this.nettyOptions.eventLoopGroup()) {
            this.ioGroup.submit(new Runnable(){

                @Override
                public void run() {
                    if (null != onClose) {
                        onClose.accept((Object)true);
                    }
                }
            });
        } else {
            this.ioGroup.shutdownGracefully().addListener((GenericFutureListener)new FutureListener<Object>(){

                public void operationComplete(Future<Object> future) throws Exception {
                    if (null != onClose) {
                        onClose.accept((Object)(future.isDone() && future.isSuccess() ? 1 : 0));
                    }
                }
            });
        }
    }

    @Override
    protected <C> NetChannel<IN, OUT> createChannel(C ioChannel) {
        SocketChannel ch = (SocketChannel)ioChannel;
        int backlog = (Integer)this.getEnvironment().getProperty("reactor.tcp.connectionReactorBacklog", Integer.class, (Object)128);
        return new NettyNetChannel(this.getEnvironment(), this.getCodec(), (Dispatcher)new NettyEventLoopDispatcher(ch.eventLoop(), backlog), this.getReactor(), (Channel)ch);
    }

    protected ChannelHandler[] createChannelHandlers(SocketChannel ioChannel) {
        NettyNetChannel conn = (NettyNetChannel)this.createChannel(ioChannel);
        NettyNetChannelInboundHandler readHandler = new NettyNetChannelInboundHandler().setNetChannel(conn);
        NettyNetChannelOutboundHandler writeHandler = new NettyNetChannelOutboundHandler();
        return new ChannelHandler[]{readHandler, writeHandler};
    }

    private void openChannel(ChannelFutureListener listener) {
        ChannelFuture channel = (ChannelFuture)this.connectionSupplier.get();
        if (null != channel && null != listener) {
            channel.addListener((GenericFutureListener)listener);
        }
    }

    private class ReconnectingChannelListener
    implements ChannelFutureListener {
        private final AtomicInteger attempts = new AtomicInteger(0);
        private final Reconnect reconnect;
        private final Deferred<NetChannel<IN, OUT>, Stream<NetChannel<IN, OUT>>> connections;
        private volatile InetSocketAddress connectAddress;

        private ReconnectingChannelListener(InetSocketAddress connectAddress, Reconnect reconnect, Deferred<NetChannel<IN, OUT>, Stream<NetChannel<IN, OUT>>> connections) {
            this.connectAddress = connectAddress;
            this.reconnect = reconnect;
            this.connections = connections;
        }

        public void operationComplete(final ChannelFuture future) throws Exception {
            if (!future.isSuccess()) {
                int attempt = this.attempts.incrementAndGet();
                Tuple2<InetSocketAddress, Long> tup = this.reconnect.reconnect(this.connectAddress, attempt);
                if (null == tup) {
                    if (NettyTcpClient.this.log.isErrorEnabled()) {
                        NettyTcpClient.this.log.error("Reconnection to {} failed after {} attempts. Giving up.", (Object)this.connectAddress, (Object)(attempt - 1));
                    }
                    future.channel().eventLoop().submit(new Runnable(){

                        @Override
                        public void run() {
                            ReconnectingChannelListener.this.connections.accept(future.cause());
                        }
                    });
                    return;
                }
                this.attemptReconnect(tup);
            } else {
                if (NettyTcpClient.this.log.isInfoEnabled()) {
                    NettyTcpClient.this.log.info("CONNECTED: " + future.channel());
                }
                final Channel ioCh = future.channel();
                ChannelPipeline ioChPipline = ioCh.pipeline();
                final AbstractNetChannel ch = ((NettyNetChannelInboundHandler)ioChPipline.get(NettyNetChannelInboundHandler.class)).getNetChannel();
                ioChPipline.addLast(new ChannelHandler[]{new ChannelDuplexHandler(){

                    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
                        if (NettyTcpClient.this.log.isInfoEnabled()) {
                            NettyTcpClient.this.log.info("CLOSED: " + ioCh);
                        }
                        NettyTcpClient.this.notifyClose(ch);
                        Tuple2<InetSocketAddress, Long> tup = ReconnectingChannelListener.this.reconnect.reconnect(ReconnectingChannelListener.this.connectAddress, ReconnectingChannelListener.this.attempts.incrementAndGet());
                        if (null == tup) {
                            return;
                        }
                        if (!((NettyNetChannel)ch).isClosing()) {
                            ReconnectingChannelListener.this.attemptReconnect((Tuple2<InetSocketAddress, Long>)tup);
                        } else {
                            NettyTcpClient.this.closing = true;
                        }
                        super.channelInactive(ctx);
                    }
                }});
                ioCh.eventLoop().submit(new Runnable(){

                    @Override
                    public void run() {
                        ReconnectingChannelListener.this.connections.accept((Object)ch);
                    }
                });
            }
        }

        private void attemptReconnect(Tuple2<InetSocketAddress, Long> tup) {
            this.connectAddress = (InetSocketAddress)tup.getT1();
            NettyTcpClient.this.bootstrap.remoteAddress((SocketAddress)this.connectAddress);
            long delay = (Long)tup.getT2();
            if (NettyTcpClient.this.log.isInfoEnabled()) {
                NettyTcpClient.this.log.info("Failed to connect to {}. Attempting reconnect in {}ms.", (Object)this.connectAddress, (Object)delay);
            }
            NettyTcpClient.this.getEnvironment().getRootTimer().submit((Consumer)new Consumer<Long>(){

                public void accept(Long now) {
                    NettyTcpClient.this.openChannel(ReconnectingChannelListener.this);
                }
            }, delay, TimeUnit.MILLISECONDS).cancelAfterUse();
        }
    }

    private class ConnectingChannelListener
    implements ChannelFutureListener {
        private final Deferred<NetChannel<IN, OUT>, Promise<NetChannel<IN, OUT>>> connection;

        private ConnectingChannelListener(Deferred<NetChannel<IN, OUT>, Promise<NetChannel<IN, OUT>>> connection) {
            this.connection = connection;
        }

        public void operationComplete(ChannelFuture future) throws Exception {
            if (!future.isSuccess()) {
                if (NettyTcpClient.this.log.isErrorEnabled()) {
                    NettyTcpClient.this.log.error(future.cause().getMessage(), future.cause());
                }
                this.connection.accept(future.cause());
                return;
            }
            if (NettyTcpClient.this.log.isInfoEnabled()) {
                NettyTcpClient.this.log.info("CONNECTED: " + future.channel());
            }
            NettyNetChannelInboundHandler inboundHandler = (NettyNetChannelInboundHandler)future.channel().pipeline().get(NettyNetChannelInboundHandler.class);
            final AbstractNetChannel ch = inboundHandler.getNetChannel();
            future.channel().closeFuture().addListener((GenericFutureListener)new ChannelFutureListener(){

                public void operationComplete(ChannelFuture future) throws Exception {
                    if (NettyTcpClient.this.log.isInfoEnabled()) {
                        NettyTcpClient.this.log.info("CLOSED: " + future.channel());
                    }
                    NettyTcpClient.this.notifyClose(ch);
                }
            });
            future.channel().eventLoop().submit(new Runnable(){

                @Override
                public void run() {
                    ConnectingChannelListener.this.connection.accept((Object)ch);
                }
            });
        }
    }
}

