/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.impl;

import java.io.Closeable;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.security.cert.X509Certificate;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.shade.com.google.common.annotations.VisibleForTesting;
import org.apache.pulsar.shade.io.netty.bootstrap.Bootstrap;
import org.apache.pulsar.shade.io.netty.buffer.PooledByteBufAllocator;
import org.apache.pulsar.shade.io.netty.channel.Channel;
import org.apache.pulsar.shade.io.netty.channel.ChannelException;
import org.apache.pulsar.shade.io.netty.channel.ChannelFuture;
import org.apache.pulsar.shade.io.netty.channel.ChannelHandler;
import org.apache.pulsar.shade.io.netty.channel.ChannelInitializer;
import org.apache.pulsar.shade.io.netty.channel.ChannelOption;
import org.apache.pulsar.shade.io.netty.channel.EventLoopGroup;
import org.apache.pulsar.shade.io.netty.channel.socket.SocketChannel;
import org.apache.pulsar.shade.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import org.apache.pulsar.shade.io.netty.handler.ssl.SslContext;
import org.apache.pulsar.shade.io.netty.resolver.dns.DnsNameResolver;
import org.apache.pulsar.shade.io.netty.resolver.dns.DnsNameResolverBuilder;
import org.apache.pulsar.shade.io.netty.util.concurrent.Future;
import org.apache.pulsar.shade.io.netty.util.concurrent.GenericFutureListener;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.ByteBufPair;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.SecurityUtility;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConnectionPool
implements Closeable {
    private final ConcurrentHashMap<InetSocketAddress, ConcurrentMap<Integer, CompletableFuture<ClientCnx>>> pool;
    private final Bootstrap bootstrap;
    private final EventLoopGroup eventLoopGroup;
    private final int maxConnectionsPerHosts;
    private final DnsNameResolver dnsResolver;
    private static final int MaxMessageSize = 0x500000;
    public static final String TLS_HANDLER = "tls";
    private static final Random random = new Random();
    private static final Logger log = LoggerFactory.getLogger(ConnectionPool.class);

    public ConnectionPool(final ClientConfigurationData conf, final EventLoopGroup eventLoopGroup) {
        this.eventLoopGroup = eventLoopGroup;
        this.maxConnectionsPerHosts = conf.getConnectionsPerBroker();
        this.pool = new ConcurrentHashMap();
        this.bootstrap = new Bootstrap();
        this.bootstrap.group(eventLoopGroup);
        this.bootstrap.channel(EventLoopUtil.getClientSocketChannelClass(eventLoopGroup));
        this.bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000);
        this.bootstrap.option(ChannelOption.TCP_NODELAY, conf.isUseTcpNoDelay());
        this.bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
        this.bootstrap.handler(new ChannelInitializer<SocketChannel>(){

            @Override
            public void initChannel(SocketChannel ch) throws Exception {
                if (conf.isUseTls()) {
                    AuthenticationDataProvider authData = conf.getAuthentication().getAuthData();
                    SslContext sslCtx = authData.hasDataForTls() ? SecurityUtility.createNettySslContextForClient(conf.isTlsAllowInsecureConnection(), conf.getTlsTrustCertsFilePath(), (X509Certificate[])authData.getTlsCertificates(), authData.getTlsPrivateKey()) : SecurityUtility.createNettySslContextForClient(conf.isTlsAllowInsecureConnection(), conf.getTlsTrustCertsFilePath());
                    ch.pipeline().addLast(ConnectionPool.TLS_HANDLER, (ChannelHandler)sslCtx.newHandler(ch.alloc()));
                }
                ch.pipeline().addLast("ByteBufPairEncoder", (ChannelHandler)ByteBufPair.ENCODER);
                ch.pipeline().addLast("frameDecoder", (ChannelHandler)new LengthFieldBasedFrameDecoder(0x500000, 0, 4, 0, 4));
                ch.pipeline().addLast("handler", (ChannelHandler)new ClientCnx(conf, eventLoopGroup));
            }
        });
        this.dnsResolver = new DnsNameResolverBuilder(eventLoopGroup.next()).traceEnabled(true).channelType(EventLoopUtil.getDatagramChannelClass(eventLoopGroup)).build();
    }

    public CompletableFuture<ClientCnx> getConnection(InetSocketAddress address) {
        return this.getConnection(address, address);
    }

    public CompletableFuture<ClientCnx> getConnection(InetSocketAddress logicalAddress, InetSocketAddress physicalAddress) {
        if (this.maxConnectionsPerHosts == 0) {
            return this.createConnection(logicalAddress, physicalAddress, -1);
        }
        int randomKey = ConnectionPool.signSafeMod(random.nextInt(), this.maxConnectionsPerHosts);
        return this.pool.computeIfAbsent(logicalAddress, a -> new ConcurrentHashMap()).computeIfAbsent(randomKey, k -> this.createConnection(logicalAddress, physicalAddress, randomKey));
    }

    private CompletableFuture<ClientCnx> createConnection(InetSocketAddress logicalAddress, InetSocketAddress physicalAddress, int connectionKey) {
        if (log.isDebugEnabled()) {
            log.debug("Connection for {} not found in cache", (Object)logicalAddress);
        }
        CompletableFuture<ClientCnx> cnxFuture = new CompletableFuture<ClientCnx>();
        ((CompletableFuture)this.createConnection(physicalAddress).thenAccept(channel -> {
            log.info("[{}] Connected to server", channel);
            channel.closeFuture().addListener((GenericFutureListener<? extends Future<? super Void>>)((GenericFutureListener<Future>)v -> {
                if (log.isDebugEnabled()) {
                    log.debug("Removing closed connection from pool: {}", (Object)v);
                }
                this.cleanupConnection(logicalAddress, connectionKey, cnxFuture);
            }));
            ClientCnx cnx = (ClientCnx)channel.pipeline().get("handler");
            if (!channel.isActive() || cnx == null) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Connection was already closed by the time we got notified", channel);
                }
                cnxFuture.completeExceptionally(new ChannelException("Connection already closed"));
                return;
            }
            if (!logicalAddress.equals(physicalAddress)) {
                cnx.setTargetBroker(logicalAddress);
            }
            cnx.setRemoteHostName(physicalAddress.getHostName());
            ((CompletableFuture)cnx.connectionFuture().thenRun(() -> {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Connection handshake completed", (Object)cnx.channel());
                }
                cnxFuture.complete(cnx);
            })).exceptionally(exception -> {
                log.warn("[{}] Connection handshake failed: {}", (Object)cnx.channel(), (Object)exception.getMessage());
                cnxFuture.completeExceptionally((Throwable)exception);
                this.cleanupConnection(logicalAddress, connectionKey, cnxFuture);
                cnx.ctx().close();
                return null;
            });
        })).exceptionally(exception -> {
            this.eventLoopGroup.execute(() -> {
                log.warn("Failed to open connection to {} : {}", (Object)physicalAddress, (Object)exception.getMessage());
                this.cleanupConnection(logicalAddress, connectionKey, cnxFuture);
                cnxFuture.completeExceptionally(new PulsarClientException((Throwable)exception));
            });
            return null;
        });
        return cnxFuture;
    }

    private CompletableFuture<Channel> createConnection(InetSocketAddress unresolvedAddress) {
        String hostname = unresolvedAddress.getHostString();
        int port = unresolvedAddress.getPort();
        return this.resolveName(hostname).thenCompose(inetAddresses -> this.connectToResolvedAddresses(inetAddresses.iterator(), port));
    }

    private CompletableFuture<Channel> connectToResolvedAddresses(Iterator<InetAddress> unresolvedAddresses, int port) {
        CompletableFuture<Channel> future = new CompletableFuture<Channel>();
        ((CompletableFuture)this.connectToAddress(unresolvedAddresses.next(), port).thenAccept(channel -> future.complete((Channel)channel))).exceptionally(exception -> {
            if (unresolvedAddresses.hasNext()) {
                ((CompletableFuture)this.connectToResolvedAddresses(unresolvedAddresses, port).thenAccept(channel -> future.complete((Channel)channel))).exceptionally(ex -> {
                    future.completeExceptionally((Throwable)ex);
                    return null;
                });
            } else {
                future.completeExceptionally((Throwable)exception);
            }
            return null;
        });
        return future;
    }

    @VisibleForTesting
    CompletableFuture<List<InetAddress>> resolveName(String hostname) {
        CompletableFuture<List<InetAddress>> future = new CompletableFuture<List<InetAddress>>();
        this.dnsResolver.resolveAll(hostname).addListener(resolveFuture -> {
            if (resolveFuture.isSuccess()) {
                future.complete((List<InetAddress>)resolveFuture.get());
            } else {
                future.completeExceptionally(resolveFuture.cause());
            }
        });
        return future;
    }

    private CompletableFuture<Channel> connectToAddress(InetAddress ipAddress, int port) {
        CompletableFuture<Channel> future = new CompletableFuture<Channel>();
        this.bootstrap.connect(ipAddress, port).addListener((GenericFutureListener<? extends Future<? super Void>>)((GenericFutureListener<ChannelFuture>)channelFuture -> {
            if (channelFuture.isSuccess()) {
                future.complete(channelFuture.channel());
            } else {
                future.completeExceptionally(channelFuture.cause());
            }
        }));
        return future;
    }

    @Override
    public void close() throws IOException {
        this.eventLoopGroup.shutdownGracefully();
        this.dnsResolver.close();
    }

    private void cleanupConnection(InetSocketAddress address, int connectionKey, CompletableFuture<ClientCnx> connectionFuture) {
        ConcurrentMap<Integer, CompletableFuture<ClientCnx>> map = this.pool.get(address);
        if (map != null) {
            map.remove(connectionKey, connectionFuture);
        }
    }

    public static int signSafeMod(long dividend, int divisor) {
        int mod = (int)(dividend % (long)divisor);
        if (mod < 0) {
            mod += divisor;
        }
        return mod;
    }
}

