/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.impl;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.security.cert.X509Certificate;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
import org.apache.pulsar.client.api.ClientConfiguration;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.shade.io.netty.bootstrap.Bootstrap;
import org.apache.pulsar.shade.io.netty.buffer.PooledByteBufAllocator;
import org.apache.pulsar.shade.io.netty.channel.ChannelException;
import org.apache.pulsar.shade.io.netty.channel.ChannelFuture;
import org.apache.pulsar.shade.io.netty.channel.ChannelHandler;
import org.apache.pulsar.shade.io.netty.channel.ChannelInitializer;
import org.apache.pulsar.shade.io.netty.channel.ChannelOption;
import org.apache.pulsar.shade.io.netty.channel.EventLoopGroup;
import org.apache.pulsar.shade.io.netty.channel.socket.SocketChannel;
import org.apache.pulsar.shade.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import org.apache.pulsar.shade.io.netty.handler.ssl.SslContext;
import org.apache.pulsar.shade.io.netty.handler.ssl.SslContextBuilder;
import org.apache.pulsar.shade.io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import org.apache.pulsar.shade.io.netty.util.concurrent.Future;
import org.apache.pulsar.shade.io.netty.util.concurrent.GenericFutureListener;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConnectionPool
implements Closeable {
    private final ConcurrentHashMap<InetSocketAddress, ConcurrentMap<Integer, CompletableFuture<ClientCnx>>> pool;
    private final Bootstrap bootstrap;
    private final EventLoopGroup eventLoopGroup;
    private final int maxConnectionsPerHosts;
    private static final int MaxMessageSize = 0x500000;
    public static final String TLS_HANDLER = "tls";
    private static final Random random = new Random();
    private static final Logger log = LoggerFactory.getLogger(ConnectionPool.class);

    public ConnectionPool(final PulsarClientImpl client, EventLoopGroup eventLoopGroup) {
        this.eventLoopGroup = eventLoopGroup;
        this.maxConnectionsPerHosts = client.getConfiguration().getConnectionsPerBroker();
        this.pool = new ConcurrentHashMap();
        this.bootstrap = new Bootstrap();
        this.bootstrap.group(eventLoopGroup);
        this.bootstrap.channel(EventLoopUtil.getClientSocketChannelClass(eventLoopGroup));
        this.bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000);
        this.bootstrap.option(ChannelOption.TCP_NODELAY, client.getConfiguration().isUseTcpNoDelay());
        this.bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
        this.bootstrap.handler(new ChannelInitializer<SocketChannel>(){

            @Override
            public void initChannel(SocketChannel ch) throws Exception {
                ClientConfiguration clientConfig = client.getConfiguration();
                if (clientConfig.isUseTls()) {
                    SslContextBuilder builder = SslContextBuilder.forClient();
                    if (clientConfig.isTlsAllowInsecureConnection()) {
                        builder.trustManager(InsecureTrustManagerFactory.INSTANCE);
                    } else if (clientConfig.getTlsTrustCertsFilePath().isEmpty()) {
                        builder.trustManager((File)null);
                    } else {
                        File trustCertCollection = new File(clientConfig.getTlsTrustCertsFilePath());
                        builder.trustManager(trustCertCollection);
                    }
                    AuthenticationDataProvider authData = clientConfig.getAuthentication().getAuthData();
                    if (authData.hasDataForTls()) {
                        builder.keyManager(authData.getTlsPrivateKey(), (X509Certificate[])authData.getTlsCertificates());
                    }
                    SslContext sslCtx = builder.build();
                    ch.pipeline().addLast(ConnectionPool.TLS_HANDLER, (ChannelHandler)sslCtx.newHandler(ch.alloc()));
                }
                ch.pipeline().addLast("frameDecoder", (ChannelHandler)new LengthFieldBasedFrameDecoder(0x500000, 0, 4, 0, 4));
                ch.pipeline().addLast("handler", (ChannelHandler)new ClientCnx(client));
            }
        });
    }

    public CompletableFuture<ClientCnx> getConnection(InetSocketAddress address) {
        return this.getConnection(address, address);
    }

    public CompletableFuture<ClientCnx> getConnection(InetSocketAddress logicalAddress, InetSocketAddress physicalAddress) {
        if (this.maxConnectionsPerHosts == 0) {
            return this.createConnection(logicalAddress, physicalAddress, -1);
        }
        int randomKey = ConnectionPool.signSafeMod(random.nextInt(), this.maxConnectionsPerHosts);
        return this.pool.computeIfAbsent(logicalAddress, a -> new ConcurrentHashMap()).computeIfAbsent(randomKey, k -> this.createConnection(logicalAddress, physicalAddress, randomKey));
    }

    private CompletableFuture<ClientCnx> createConnection(InetSocketAddress logicalAddress, InetSocketAddress physicalAddress, int connectionKey) {
        if (log.isDebugEnabled()) {
            log.debug("Connection for {} not found in cache", (Object)logicalAddress);
        }
        CompletableFuture<ClientCnx> cnxFuture = new CompletableFuture<ClientCnx>();
        this.bootstrap.connect(physicalAddress).addListener((GenericFutureListener<? extends Future<? super Void>>)((GenericFutureListener<ChannelFuture>)future -> {
            if (!future.isSuccess()) {
                log.warn("Failed to open connection to {} : {}", (Object)physicalAddress, (Object)future.cause().getClass().getSimpleName());
                cnxFuture.completeExceptionally(new PulsarClientException(future.cause()));
                this.cleanupConnection(logicalAddress, connectionKey, cnxFuture);
                return;
            }
            log.info("[{}] Connected to server", (Object)future.channel());
            future.channel().closeFuture().addListener((GenericFutureListener<? extends Future<? super Void>>)((GenericFutureListener<Future>)v -> {
                if (log.isDebugEnabled()) {
                    log.debug("Removing closed connection from pool: {}", (Object)v);
                }
                this.cleanupConnection(logicalAddress, connectionKey, cnxFuture);
            }));
            ClientCnx cnx = (ClientCnx)future.channel().pipeline().get("handler");
            if (!future.channel().isActive() || cnx == null) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Connection was already closed by the time we got notified", (Object)future.channel());
                }
                cnxFuture.completeExceptionally(new ChannelException("Connection already closed"));
                return;
            }
            if (!logicalAddress.equals(physicalAddress)) {
                cnx.setTargetBroker(logicalAddress);
            }
            ((CompletableFuture)cnx.connectionFuture().thenRun(() -> {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Connection handshake completed", (Object)cnx.channel());
                }
                cnxFuture.complete(cnx);
            })).exceptionally(exception -> {
                log.warn("[{}] Connection handshake failed: {}", (Object)cnx.channel(), (Object)exception.getMessage());
                cnxFuture.completeExceptionally((Throwable)exception);
                this.cleanupConnection(logicalAddress, connectionKey, cnxFuture);
                cnx.ctx().close();
                return null;
            });
        }));
        return cnxFuture;
    }

    @Override
    public void close() throws IOException {
        this.eventLoopGroup.shutdownGracefully();
    }

    private void cleanupConnection(InetSocketAddress address, int connectionKey, CompletableFuture<ClientCnx> connectionFuture) {
        ConcurrentMap<Integer, CompletableFuture<ClientCnx>> map = this.pool.get(address);
        if (map != null) {
            map.remove(connectionKey, connectionFuture);
        }
    }

    public static int signSafeMod(long dividend, int divisor) {
        int mod = (int)(dividend % (long)divisor);
        if (mod < 0) {
            mod += divisor;
        }
        return mod;
    }
}

