/*
 * Decompiled with CFR 0.152.
 */
package reactor.tcp.netty;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.net.ssl.SSLEngine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Environment;
import reactor.core.Reactor;
import reactor.core.composable.Deferred;
import reactor.core.composable.Promise;
import reactor.core.composable.Stream;
import reactor.core.composable.spec.Promises;
import reactor.core.composable.spec.Streams;
import reactor.event.dispatch.Dispatcher;
import reactor.function.Consumer;
import reactor.function.Supplier;
import reactor.io.Buffer;
import reactor.support.NamedDaemonThreadFactory;
import reactor.tcp.Reconnect;
import reactor.tcp.TcpClient;
import reactor.tcp.TcpConnection;
import reactor.tcp.config.ClientSocketOptions;
import reactor.tcp.config.SslOptions;
import reactor.tcp.encoding.Codec;
import reactor.tcp.netty.NettyClientSocketOptions;
import reactor.tcp.netty.NettyEventLoopDispatcher;
import reactor.tcp.netty.NettyTcpConnection;
import reactor.tcp.netty.NettyTcpConnectionChannelInboundHandler;
import reactor.tcp.ssl.SSLEngineSupplier;
import reactor.tuple.Tuple2;

public class NettyTcpClient<IN, OUT>
extends TcpClient<IN, OUT> {
    private final Logger log = LoggerFactory.getLogger(NettyTcpClient.class);
    private final Bootstrap bootstrap;
    private final Dispatcher eventsDispatcher;
    private final ClientSocketOptions options;
    private final EventLoopGroup ioGroup;
    private final Supplier<ChannelFuture> connectionSupplier;
    private volatile InetSocketAddress connectAddress;
    private volatile boolean closing;

    public NettyTcpClient(@Nonnull Environment env, @Nonnull Reactor reactor, @Nonnull InetSocketAddress connectAddress, @Nonnull ClientSocketOptions opts, final @Nullable SslOptions sslOpts, @Nullable Codec<Buffer, IN, OUT> codec) {
        super(env, reactor, connectAddress, opts, sslOpts, codec);
        this.eventsDispatcher = reactor.getDispatcher();
        this.connectAddress = connectAddress;
        this.options = opts;
        int ioThreadCount = (Integer)env.getProperty("reactor.tcp.ioThreadCount", Integer.class, (Object)Environment.PROCESSORS);
        this.ioGroup = new NioEventLoopGroup(ioThreadCount, (ThreadFactory)new NamedDaemonThreadFactory("reactor-tcp-io"));
        this.bootstrap = (Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)new Bootstrap().group(this.ioGroup)).channel(NioSocketChannel.class)).option(ChannelOption.SO_RCVBUF, (Object)this.options.rcvbuf())).option(ChannelOption.SO_SNDBUF, (Object)this.options.sndbuf())).option(ChannelOption.SO_KEEPALIVE, (Object)this.options.keepAlive())).option(ChannelOption.SO_LINGER, (Object)this.options.linger())).option(ChannelOption.TCP_NODELAY, (Object)this.options.tcpNoDelay())).remoteAddress((SocketAddress)this.connectAddress).handler((ChannelHandler)new ChannelInitializer<SocketChannel>(){

            public void initChannel(final SocketChannel ch) throws Exception {
                ch.config().setConnectTimeoutMillis(NettyTcpClient.this.options.timeout());
                if (null != sslOpts) {
                    SSLEngine ssl = new SSLEngineSupplier(sslOpts, true).get();
                    NettyTcpClient.this.log.debug("SSL enabled using keystore {}", (Object)(null != sslOpts.keystoreFile() ? sslOpts.keystoreFile() : "<DEFAULT>"));
                    ch.pipeline().addLast(new ChannelHandler[]{new SslHandler(ssl)});
                }
                if (NettyTcpClient.this.options instanceof NettyClientSocketOptions && null != ((NettyClientSocketOptions)NettyTcpClient.this.options).pipelineConfigurer()) {
                    ((NettyClientSocketOptions)NettyTcpClient.this.options).pipelineConfigurer().accept((Object)ch.pipeline());
                }
                ch.pipeline().addLast(NettyTcpClient.this.createChannelHandlers(ch));
                ch.closeFuture().addListener((GenericFutureListener)new ChannelFutureListener(){

                    public void operationComplete(ChannelFuture future) throws Exception {
                        NettyTcpClient.this.connections.remove(ch);
                    }
                });
            }
        });
        this.connectionSupplier = new Supplier<ChannelFuture>(){

            public ChannelFuture get() {
                if (!NettyTcpClient.this.closing) {
                    return NettyTcpClient.this.bootstrap.connect((SocketAddress)NettyTcpClient.this.connectAddress);
                }
                return null;
            }
        };
    }

    @Override
    public Promise<TcpConnection<IN, OUT>> open() {
        Deferred connection = Promises.defer((Environment)this.env, (Dispatcher)this.eventsDispatcher);
        this.createConnection(this.createConnectListener(connection));
        return (Promise)connection.compose();
    }

    @Override
    public Stream<TcpConnection<IN, OUT>> open(Reconnect reconnect) {
        Deferred connections = Streams.defer((Environment)this.env, (Dispatcher)this.eventsDispatcher);
        this.createConnection(this.createReconnectListener(connections, reconnect));
        return (Stream)connections.compose();
    }

    @Override
    protected <C> TcpConnection<IN, OUT> createConnection(C channel) {
        SocketChannel ch = (SocketChannel)channel;
        return new NettyTcpConnection(this.env, this.getCodec(), (Dispatcher)new NettyEventLoopDispatcher(ch.eventLoop()), this.eventsDispatcher, ch, this.connectAddress);
    }

    protected ChannelHandler[] createChannelHandlers(SocketChannel ch) {
        NettyTcpConnection conn = (NettyTcpConnection)this.select(ch);
        return new ChannelHandler[]{new NettyTcpConnectionChannelInboundHandler(conn)};
    }

    @Override
    protected void doClose(Deferred<Void, Promise<Void>> d) {
        this.closing = true;
        try {
            final CountDownLatch latch = new CountDownLatch(1);
            this.ioGroup.shutdownGracefully().addListener(new GenericFutureListener(){

                public void operationComplete(Future future) throws Exception {
                    latch.countDown();
                }
            });
            if (latch.await(30L, TimeUnit.SECONDS)) {
                d.accept((Object)null);
            } else {
                d.accept((Throwable)new TimeoutException("NettyTcpClient could not close connection after 30 seconds"));
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            d.accept((Throwable)e);
        }
    }

    private ChannelFutureListener createConnectListener(final Deferred<TcpConnection<IN, OUT>, Promise<TcpConnection<IN, OUT>>> connection) {
        return new ChannelFutureListener(){

            public void operationComplete(ChannelFuture future) throws Exception {
                if (!future.isSuccess()) {
                    if (NettyTcpClient.this.log.isErrorEnabled()) {
                        NettyTcpClient.this.log.error(future.cause().getMessage(), future.cause());
                    }
                    connection.accept(future.cause());
                    return;
                }
                if (NettyTcpClient.this.log.isInfoEnabled()) {
                    NettyTcpClient.this.log.info("CONNECT: " + future.channel());
                }
                final NettyTcpConnection conn = (NettyTcpConnection)NettyTcpClient.this.select(future.channel());
                future.channel().closeFuture().addListener((GenericFutureListener)new ChannelFutureListener(){

                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (NettyTcpClient.this.log.isInfoEnabled()) {
                            NettyTcpClient.this.log.info("CLOSED: " + future.channel());
                        }
                        NettyTcpClient.this.connections.remove(future.channel());
                        NettyTcpClient.this.notifyClose(conn);
                    }
                });
                connection.accept((Object)conn);
            }
        };
    }

    private ChannelFutureListener createReconnectListener(final Deferred<TcpConnection<IN, OUT>, Stream<TcpConnection<IN, OUT>>> connections, final Reconnect reconnect) {
        return new ChannelFutureListener(){
            final AtomicInteger attempts = new AtomicInteger(0);
            final ChannelFutureListener self = this;

            public void operationComplete(final ChannelFuture future) throws Exception {
                if (!future.isSuccess()) {
                    int attempt = this.attempts.incrementAndGet();
                    Tuple2<InetSocketAddress, Long> tup = reconnect.reconnect(NettyTcpClient.this.connectAddress, attempt);
                    if (null == tup) {
                        if (NettyTcpClient.this.log.isErrorEnabled()) {
                            NettyTcpClient.this.log.error("Reconnection to {} failed after {} attempts.", (Object)NettyTcpClient.this.connectAddress, (Object)(attempt - 1));
                        }
                        future.channel().eventLoop().execute(new Runnable(){

                            @Override
                            public void run() {
                                connections.accept(future.cause());
                            }
                        });
                        return;
                    }
                    NettyTcpClient.this.connectAddress = (InetSocketAddress)tup.getT1();
                    NettyTcpClient.this.bootstrap.remoteAddress((SocketAddress)NettyTcpClient.this.connectAddress);
                    long delay = (Long)tup.getT2();
                    if (NettyTcpClient.this.log.isInfoEnabled()) {
                        NettyTcpClient.this.log.info("Attempting to reconnect to {} after {}ms", (Object)NettyTcpClient.this.connectAddress, (Object)delay);
                    }
                    NettyTcpClient.this.env.getRootTimer().submit((Consumer)new Consumer<Long>(){

                        public void accept(Long now) {
                            NettyTcpClient.this.createConnection(self);
                        }
                    }, delay, TimeUnit.MILLISECONDS);
                } else {
                    if (NettyTcpClient.this.log.isInfoEnabled()) {
                        NettyTcpClient.this.log.info("CONNECT: " + future.channel());
                    }
                    final NettyTcpConnection conn = (NettyTcpConnection)NettyTcpClient.this.select(future.channel());
                    future.channel().closeFuture().addListener((GenericFutureListener)new ChannelFutureListener(){

                        public void operationComplete(ChannelFuture future) throws Exception {
                            if (NettyTcpClient.this.log.isInfoEnabled()) {
                                NettyTcpClient.this.log.info("CLOSED: " + future.channel());
                            }
                            NettyTcpClient.this.connections.remove(future.channel());
                            NettyTcpClient.this.notifyClose(conn);
                            if (!conn.isClosing()) {
                                int attempt = attempts.incrementAndGet();
                                Tuple2<InetSocketAddress, Long> tup = reconnect.reconnect(NettyTcpClient.this.connectAddress, attempt);
                                if (null != tup) {
                                    long delay = (Long)tup.getT2();
                                    if (NettyTcpClient.this.log.isInfoEnabled()) {
                                        NettyTcpClient.this.log.info("Attempting to reconnect to {} after {}ms", (Object)NettyTcpClient.this.connectAddress, (Object)delay);
                                    }
                                    NettyTcpClient.this.env.getRootTimer().submit((Consumer)new Consumer<Long>(){

                                        public void accept(Long now) {
                                            NettyTcpClient.this.createConnection(self);
                                        }
                                    }, delay, TimeUnit.MILLISECONDS);
                                }
                            }
                        }
                    });
                    future.channel().eventLoop().execute(new Runnable(){

                        @Override
                        public void run() {
                            connections.accept((Object)conn);
                        }
                    });
                }
            }
        };
    }

    private void createConnection(ChannelFutureListener listener) {
        ChannelFuture channel = (ChannelFuture)this.connectionSupplier.get();
        if (channel != null) {
            channel.addListener((GenericFutureListener)listener);
        }
    }
}

