package reactor.io.net.impl.netty.tcp;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.net.ssl.SSLEngine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.Environment;
import reactor.core.Dispatcher;
import reactor.core.support.NamedDaemonThreadFactory;
import reactor.fn.Consumer;
import reactor.fn.Supplier;
import reactor.fn.tuple.Tuple;
import reactor.fn.tuple.Tuple2;
import reactor.io.buffer.Buffer;
import reactor.io.codec.Codec;
import reactor.io.net.ChannelStream;
import reactor.io.net.ReactorChannelHandler;
import reactor.io.net.Reconnect;
import reactor.io.net.config.ClientSocketOptions;
import reactor.io.net.config.SslOptions;
import reactor.io.net.impl.netty.NettyChannelHandlerBridge;
import reactor.io.net.impl.netty.NettyChannelStream;
import reactor.io.net.impl.netty.NettyClientSocketOptions;
import reactor.io.net.impl.netty.NettyNativeDetector;
import reactor.io.net.tcp.TcpClient;
import reactor.io.net.tcp.ssl.SSLEngineSupplier;
import reactor.rx.Promise;
import reactor.rx.Promises;
import reactor.rx.Stream;
import reactor.rx.broadcast.BehaviorBroadcaster;
import reactor.rx.broadcast.Broadcaster;

/* loaded from: input_file:reactor/io/net/impl/netty/tcp/NettyTcpClient.class */
public class NettyTcpClient<IN, OUT> extends TcpClient<IN, OUT> {
    private static final Logger log = LoggerFactory.getLogger(NettyTcpClient.class);
    private final NettyClientSocketOptions nettyOptions;
    private final Bootstrap bootstrap;
    private final EventLoopGroup ioGroup;
    private final Supplier<ChannelFuture> connectionSupplier;
    private volatile InetSocketAddress connectAddress;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:reactor/io/net/impl/netty/tcp/NettyTcpClient$ReconnectingChannelListener.class */
    public class ReconnectingChannelListener implements ChannelFutureListener {
        private final AtomicInteger attempts;
        private final Reconnect reconnect;
        private final Broadcaster<Tuple2<InetSocketAddress, Integer>> broadcaster;
        private volatile InetSocketAddress connectAddress;

        private ReconnectingChannelListener(InetSocketAddress inetSocketAddress, Reconnect reconnect) {
            this.attempts = new AtomicInteger(0);
            this.broadcaster = BehaviorBroadcaster.create(NettyTcpClient.this.getDefaultEnvironment(), NettyTcpClient.this.getDefaultDispatcher());
            this.connectAddress = inetSocketAddress;
            this.reconnect = reconnect;
        }

        public void operationComplete(final ChannelFuture channelFuture) throws Exception {
            this.broadcaster.onNext(Tuple.of(this.connectAddress, Integer.valueOf(this.attempts.get())));
            if (channelFuture.isSuccess()) {
                if (NettyTcpClient.log.isInfoEnabled()) {
                    NettyTcpClient.log.info("CONNECTED: " + channelFuture.channel());
                }
                final Channel channel = channelFuture.channel();
                channel.pipeline().addLast(new ChannelHandler[]{new ChannelDuplexHandler() { // from class: reactor.io.net.impl.netty.tcp.NettyTcpClient.ReconnectingChannelListener.2
                    public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {
                        if (NettyTcpClient.log.isInfoEnabled()) {
                            NettyTcpClient.log.info("CLOSED: " + channel);
                        }
                        Tuple2<InetSocketAddress, Long> reconnect = ReconnectingChannelListener.this.reconnect.reconnect(ReconnectingChannelListener.this.connectAddress, ReconnectingChannelListener.this.attempts.incrementAndGet());
                        if (null == reconnect) {
                            ReconnectingChannelListener.this.broadcaster.onComplete();
                        } else {
                            ReconnectingChannelListener.this.attemptReconnect(reconnect);
                            super.channelInactive(channelHandlerContext);
                        }
                    }
                }});
                return;
            }
            int incrementAndGet = this.attempts.incrementAndGet();
            Tuple2<InetSocketAddress, Long> reconnect = this.reconnect.reconnect(this.connectAddress, incrementAndGet);
            if (null != reconnect) {
                attemptReconnect(reconnect);
                return;
            }
            if (NettyTcpClient.log.isErrorEnabled()) {
                NettyTcpClient.log.error("Reconnection to {} failed after {} attempts. Giving up.", this.connectAddress, Integer.valueOf(incrementAndGet - 1));
            }
            channelFuture.channel().eventLoop().submit(new Runnable() { // from class: reactor.io.net.impl.netty.tcp.NettyTcpClient.ReconnectingChannelListener.1
                @Override // java.lang.Runnable
                public void run() {
                    ReconnectingChannelListener.this.broadcaster.onError(channelFuture.cause());
                }
            });
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void attemptReconnect(Tuple2<InetSocketAddress, Long> tuple2) {
            this.connectAddress = (InetSocketAddress) tuple2.getT1();
            NettyTcpClient.this.bootstrap.remoteAddress(this.connectAddress);
            long longValue = ((Long) tuple2.getT2()).longValue();
            if (NettyTcpClient.log.isInfoEnabled()) {
                NettyTcpClient.log.info("Failed to connect to {}. Attempting reconnect in {}ms.", this.connectAddress, Long.valueOf(longValue));
            }
            NettyTcpClient.this.getDefaultEnvironment().getTimer().submit(new Consumer<Long>() { // from class: reactor.io.net.impl.netty.tcp.NettyTcpClient.ReconnectingChannelListener.3
                public void accept(Long l) {
                    NettyTcpClient.this.openChannel(ReconnectingChannelListener.this);
                }
            }, longValue, TimeUnit.MILLISECONDS);
        }
    }

    public NettyTcpClient(Environment environment, Dispatcher dispatcher, InetSocketAddress inetSocketAddress, ClientSocketOptions clientSocketOptions, SslOptions sslOptions, Codec<Buffer, IN, OUT> codec) {
        super(environment, dispatcher, inetSocketAddress, clientSocketOptions, sslOptions, codec);
        this.connectAddress = inetSocketAddress;
        if (clientSocketOptions instanceof NettyClientSocketOptions) {
            this.nettyOptions = (NettyClientSocketOptions) clientSocketOptions;
        } else {
            this.nettyOptions = null;
        }
        if (null == this.nettyOptions || null == this.nettyOptions.eventLoopGroup()) {
            this.ioGroup = NettyNativeDetector.newEventLoopGroup(environment != null ? environment.getIntProperty("reactor.tcp.ioThreadCount", Environment.PROCESSORS) : Environment.PROCESSORS, new NamedDaemonThreadFactory("reactor-tcp-io"));
        } else {
            this.ioGroup = this.nettyOptions.eventLoopGroup();
        }
        Bootstrap option = new Bootstrap().group(this.ioGroup).channel(NettyNativeDetector.getChannel(this.ioGroup)).option(ChannelOption.AUTO_READ, Boolean.valueOf(sslOptions != null));
        if (environment != null && Boolean.parseBoolean(environment.getProperty("reactor.tcp.netty.alloc", "true"))) {
            option = (Bootstrap) option.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
        }
        this.bootstrap = clientSocketOptions != null ? (Bootstrap) option.option(ChannelOption.SO_RCVBUF, Integer.valueOf(clientSocketOptions.rcvbuf())).option(ChannelOption.SO_SNDBUF, Integer.valueOf(clientSocketOptions.sndbuf())).option(ChannelOption.SO_KEEPALIVE, Boolean.valueOf(clientSocketOptions.keepAlive())).option(ChannelOption.SO_LINGER, Integer.valueOf(clientSocketOptions.linger())).option(ChannelOption.TCP_NODELAY, Boolean.valueOf(clientSocketOptions.tcpNoDelay())) : option;
        this.connectionSupplier = new Supplier<ChannelFuture>() { // from class: reactor.io.net.impl.netty.tcp.NettyTcpClient.1
            /* renamed from: get, reason: merged with bridge method [inline-methods] */
            public ChannelFuture m24get() {
                if (NettyTcpClient.this.started.get()) {
                    return NettyTcpClient.this.bootstrap.connect(NettyTcpClient.this.getConnectAddress());
                }
                return null;
            }
        };
    }

    @Override // reactor.io.net.ReactorPeer
    protected Promise<Void> doStart(ReactorChannelHandler<IN, OUT, ChannelStream<IN, OUT>> reactorChannelHandler) {
        final Promise<Void> prepare = Promises.prepare();
        ChannelFutureListener channelFutureListener = new ChannelFutureListener() { // from class: reactor.io.net.impl.netty.tcp.NettyTcpClient.2
            public void operationComplete(ChannelFuture channelFuture) throws Exception {
                if (channelFuture.isSuccess()) {
                    prepare.onComplete();
                } else {
                    prepare.onError(channelFuture.cause());
                }
            }
        };
        addHandler(reactorChannelHandler);
        openChannel(channelFutureListener);
        return prepare;
    }

    private void addHandler(ReactorChannelHandler<IN, OUT, ChannelStream<IN, OUT>> reactorChannelHandler) {
        final ReactorChannelHandler<IN, OUT, ChannelStream<IN, OUT>> reactorChannelHandler2 = null == reactorChannelHandler ? PING : reactorChannelHandler;
        this.bootstrap.handler(new ChannelInitializer<SocketChannel>() { // from class: reactor.io.net.impl.netty.tcp.NettyTcpClient.3
            public void initChannel(SocketChannel socketChannel) throws Exception {
                socketChannel.config().setConnectTimeoutMillis(NettyTcpClient.this.getOptions().timeout());
                if (null != NettyTcpClient.this.getSslOptions()) {
                    NettyTcpClient.this.addSecureHandler(socketChannel);
                } else {
                    socketChannel.config().setAutoRead(false);
                }
                if (null != NettyTcpClient.this.nettyOptions && null != NettyTcpClient.this.nettyOptions.pipelineConfigurer()) {
                    NettyTcpClient.this.nettyOptions.pipelineConfigurer().accept(socketChannel.pipeline());
                }
                NettyTcpClient.this.bindChannel(reactorChannelHandler2, socketChannel);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void addSecureHandler(SocketChannel socketChannel) throws Exception {
        SSLEngine m31get = new SSLEngineSupplier(getSslOptions(), true).m31get();
        if (log.isDebugEnabled()) {
            log.debug("SSL enabled using keystore {}", (null == getSslOptions() || null == getSslOptions().keystoreFile()) ? "<DEFAULT>" : getSslOptions().keystoreFile());
        }
        socketChannel.pipeline().addLast(new ChannelHandler[]{new SslHandler(m31get)});
    }

    @Override // reactor.io.net.ReactorClient
    protected Stream<Tuple2<InetSocketAddress, Integer>> doStart(ReactorChannelHandler<IN, OUT, ChannelStream<IN, OUT>> reactorChannelHandler, Reconnect reconnect) {
        ReconnectingChannelListener reconnectingChannelListener = new ReconnectingChannelListener(this.connectAddress, reconnect);
        addHandler(reactorChannelHandler);
        openChannel(reconnectingChannelListener);
        return reconnectingChannelListener.broadcaster;
    }

    @Override // reactor.io.net.ReactorPeer
    protected Promise<Void> doShutdown() {
        if (this.nettyOptions != null && this.nettyOptions.eventLoopGroup() != null) {
            return Promises.success();
        }
        final Promise<Void> prepare = Promises.prepare();
        this.ioGroup.shutdownGracefully().addListener(new FutureListener<Object>() { // from class: reactor.io.net.impl.netty.tcp.NettyTcpClient.4
            public void operationComplete(Future<Object> future) throws Exception {
                if (future.isDone() && future.isSuccess()) {
                    prepare.onComplete();
                } else {
                    prepare.onError(future.cause());
                }
            }
        });
        return prepare;
    }

    protected void bindChannel(ReactorChannelHandler<IN, OUT, ChannelStream<IN, OUT>> reactorChannelHandler, SocketChannel socketChannel) {
        NettyChannelStream nettyChannelStream = new NettyChannelStream(getDefaultEnvironment(), getDefaultCodec(), getDefaultPrefetchSize(), getDefaultDispatcher(), socketChannel);
        ChannelPipeline pipeline = socketChannel.pipeline();
        if (log.isDebugEnabled()) {
            pipeline.addLast(new ChannelHandler[]{new LoggingHandler(NettyTcpClient.class)});
        }
        pipeline.addLast(new ChannelHandler[]{new NettyChannelHandlerBridge(reactorChannelHandler, nettyChannelStream)});
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void openChannel(ChannelFutureListener channelFutureListener) {
        ChannelFuture channelFuture = (ChannelFuture) this.connectionSupplier.get();
        if (null == channelFuture || null == channelFutureListener) {
            return;
        }
        channelFuture.addListener(channelFutureListener);
    }
}
