/*
 * Decompiled with CFR 0.152.
 */
package reactor.netty.resources;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.AttributeKey;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.internal.PlatformDependent;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import java.util.WeakHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.BiPredicate;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.MonoSink;
import reactor.core.publisher.Operators;
import reactor.netty.Connection;
import reactor.netty.ConnectionObserver;
import reactor.netty.FutureMono;
import reactor.netty.ReactorNetty;
import reactor.netty.channel.BootstrapHandlers;
import reactor.netty.channel.ChannelOperations;
import reactor.netty.internal.shaded.reactor.pool.InstrumentedPool;
import reactor.netty.internal.shaded.reactor.pool.PoolBuilder;
import reactor.netty.internal.shaded.reactor.pool.PoolConfig;
import reactor.netty.internal.shaded.reactor.pool.PooledRef;
import reactor.netty.internal.shaded.reactor.pool.PooledRefMetadata;
import reactor.netty.resources.ConnectionPoolMetrics;
import reactor.netty.resources.ConnectionProvider;
import reactor.netty.resources.MicrometerPooledConnectionProviderMeterRegistrar;
import reactor.netty.resources.NewConnectionProvider;
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.annotation.NonNull;
import reactor.util.concurrent.Queues;
import reactor.util.context.Context;

final class PooledConnectionProvider
implements ConnectionProvider {
    final ConcurrentMap<PoolKey, InstrumentedPool<PooledConnection>> channelPools = PlatformDependent.newConcurrentHashMap();
    private final Map<PoolKey, ConnectionPoolMetrics> poolMetrics = new WeakHashMap<PoolKey, ConnectionPoolMetrics>();
    final String name;
    final Map<SocketAddress, PoolFactory> poolFactoryPerRemoteHost = new HashMap<SocketAddress, PoolFactory>();
    final PoolFactory defaultPoolFactory;
    static final Logger log = Loggers.getLogger(PooledConnectionProvider.class);
    static final AttributeKey<ConnectionObserver> OWNER = AttributeKey.valueOf((String)"connectionOwner");

    PooledConnectionProvider(ConnectionProvider.Builder builder) {
        this.name = builder.name;
        this.defaultPoolFactory = new PoolFactory(builder);
        for (Map.Entry<SocketAddress, ConnectionProvider.ConnectionPoolSpec<?>> entry : builder.confPerRemoteHost.entrySet()) {
            this.poolFactoryPerRemoteHost.put(entry.getKey(), new PoolFactory(entry.getValue()));
        }
    }

    @Override
    public void disposeWhen(@NonNull SocketAddress address) {
        List<Map.Entry> toDispose = this.channelPools.entrySet().stream().filter(p -> this.compareAddresses(((PoolKey)p.getKey()).holder, address)).collect(Collectors.toList());
        toDispose.forEach(e -> {
            if (this.channelPools.remove(e.getKey(), e.getValue())) {
                if (log.isDebugEnabled()) {
                    log.debug("Disposing pool for {}", new Object[]{((PoolKey)e.getKey()).fqdn});
                }
                ((InstrumentedPool)e.getValue()).dispose();
            }
        });
    }

    private boolean compareAddresses(SocketAddress origin, SocketAddress target) {
        if (origin.equals(target)) {
            return true;
        }
        if (origin instanceof InetSocketAddress && target instanceof InetSocketAddress) {
            InetSocketAddress isaOrigin = (InetSocketAddress)origin;
            InetSocketAddress isaTarget = (InetSocketAddress)target;
            if (isaOrigin.getPort() == isaTarget.getPort()) {
                InetAddress iaTarget = isaTarget.getAddress();
                return iaTarget != null && iaTarget.isAnyLocalAddress() || Objects.equals(isaOrigin.getHostString(), isaTarget.getHostString());
            }
        }
        return false;
    }

    public Mono<Connection> acquire(Bootstrap b) {
        return Mono.create((T sink) -> {
            Bootstrap bootstrap = b.clone();
            ChannelOperations.OnSetup opsFactory = BootstrapHandlers.channelOperationFactory(bootstrap);
            ConnectionObserver obs = BootstrapHandlers.connectionObserver(bootstrap);
            NewConnectionProvider.convertLazyRemoteAddress(bootstrap);
            ChannelHandler handler = bootstrap.config().handler();
            SocketAddress remoteAddress = bootstrap.config().remoteAddress();
            PoolKey holder = new PoolKey(remoteAddress, handler != null ? handler.hashCode() : -1);
            PoolFactory poolFactory = this.poolFactoryPerRemoteHost.getOrDefault(remoteAddress, this.defaultPoolFactory);
            InstrumentedPool pool = this.channelPools.computeIfAbsent(holder, poolKey -> {
                if (log.isDebugEnabled()) {
                    log.debug("Creating a new [{}] client pool [{}] for [{}]", new Object[]{this.name, poolFactory, remoteAddress});
                }
                InstrumentedPool<PooledConnection> newPool = new PooledConnectionAllocator((Bootstrap)bootstrap, (PoolFactory)poolFactory, (ChannelOperations.OnSetup)opsFactory).pool;
                if (poolFactory.metricsEnabled || BootstrapHandlers.findMetricsSupport(bootstrap) != null) {
                    MicrometerPooledConnectionProviderMeterRegistrar registrar = poolFactory.registrar != null ? poolFactory.registrar.get() : MicrometerPooledConnectionProviderMeterRegistrar.INSTANCE;
                    ConnectionPoolMetrics.DelegatingConnectionPoolMetrics metrics = new ConnectionPoolMetrics.DelegatingConnectionPoolMetrics(newPool.metrics());
                    this.poolMetrics.put((PoolKey)poolKey, metrics);
                    registrar.registerMetrics(this.name, poolKey.hashCode() + "", remoteAddress, metrics);
                }
                return newPool;
            });
            PooledConnectionProvider.disposableAcquire(new DisposableAcquire((MonoSink<Connection>)sink, pool, obs, opsFactory, poolFactory.pendingAcquireTimeout, false));
        });
    }

    @Override
    public Mono<Void> disposeLater() {
        return Mono.defer(() -> {
            ArrayList<Mono<Void>> pools = new ArrayList<Mono<Void>>();
            for (PoolKey key : this.channelPools.keySet()) {
                pools.add(((InstrumentedPool)this.channelPools.remove(key)).disposeLater());
            }
            if (pools.isEmpty()) {
                return Mono.empty();
            }
            return Mono.when(pools);
        });
    }

    public boolean isDisposed() {
        return this.channelPools.isEmpty() || this.channelPools.values().stream().allMatch(Disposable::isDisposed);
    }

    @Override
    public int maxConnections() {
        return this.defaultPoolFactory.maxConnections;
    }

    static void disposableAcquire(DisposableAcquire disposableAcquire) {
        Mono mono = disposableAcquire.pool.acquire(Duration.ofMillis(disposableAcquire.pendingAcquireTimeout));
        mono.subscribe((CoreSubscriber)disposableAcquire);
    }

    static final class PoolFactory {
        final Duration evictionInterval;
        final int maxConnections;
        final int pendingAcquireMaxCount;
        final long pendingAcquireTimeout;
        final long maxIdleTime;
        final long maxLifeTime;
        final boolean metricsEnabled;
        final String leasingStrategy;
        final Supplier<? extends ConnectionProvider.MeterRegistrar> registrar;
        static final BiPredicate<PooledConnection, PooledRefMetadata> DEFAULT_EVICTION_PREDICATE = (pooledConnection, metadata) -> !pooledConnection.channel.isActive() || !pooledConnection.isPersistent();
        static final Function<PooledConnection, Publisher<Void>> DEFAULT_DESTROY_HANDLER = pooledConnection -> {
            if (!pooledConnection.channel.isActive()) {
                return Mono.empty();
            }
            return FutureMono.from(pooledConnection.channel.close());
        };

        PoolFactory(ConnectionProvider.ConnectionPoolSpec<?> conf) {
            this.evictionInterval = conf.evictionInterval;
            this.maxConnections = conf.maxConnections;
            this.pendingAcquireMaxCount = conf.pendingAcquireMaxCount == -2 ? 2 * conf.maxConnections : conf.pendingAcquireMaxCount;
            this.pendingAcquireTimeout = conf.pendingAcquireTimeout.toMillis();
            this.maxIdleTime = conf.maxIdleTime != null ? conf.maxIdleTime.toMillis() : -1L;
            this.maxLifeTime = conf.maxLifeTime != null ? conf.maxLifeTime.toMillis() : -1L;
            this.metricsEnabled = conf.metricsEnabled;
            this.leasingStrategy = conf.leasingStrategy;
            this.registrar = conf.registrar;
        }

        InstrumentedPool<PooledConnection> newPool(Publisher<PooledConnection> allocator) {
            PoolBuilder<PooledConnection, PoolConfig<PooledConnection>> poolBuilder = PoolBuilder.from(allocator).destroyHandler(DEFAULT_DESTROY_HANDLER).evictionPredicate(DEFAULT_EVICTION_PREDICATE.or((poolable, meta) -> this.maxIdleTime != -1L && meta.idleTime() >= this.maxIdleTime || this.maxLifeTime != -1L && meta.lifeTime() >= this.maxLifeTime)).maxPendingAcquire(this.pendingAcquireMaxCount).sizeBetween(0, this.maxConnections).evictInBackground(this.evictionInterval);
            if ("fifo".equals(this.leasingStrategy)) {
                return poolBuilder.idleResourceReuseLruOrder().buildPool();
            }
            return poolBuilder.idleResourceReuseMruOrder().buildPool();
        }

        public String toString() {
            return "PoolFactory{evictionInterval=" + this.evictionInterval + ", leasingStrategy=" + this.leasingStrategy + ", maxConnections=" + this.maxConnections + ", maxIdleTime=" + this.maxIdleTime + ", maxLifeTime=" + this.maxLifeTime + ", metricsEnabled=" + this.metricsEnabled + ", pendingAcquireMaxCount=" + this.pendingAcquireMaxCount + ", pendingAcquireTimeout=" + this.pendingAcquireTimeout + '}';
        }
    }

    static final class PoolKey {
        final SocketAddress holder;
        final int pipelineKey;
        final String fqdn;

        PoolKey(SocketAddress holder, int pipelineKey) {
            this.holder = holder;
            this.fqdn = holder instanceof InetSocketAddress ? holder.toString() : "null";
            this.pipelineKey = pipelineKey;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            PoolKey poolKey = (PoolKey)o;
            return this.pipelineKey == poolKey.pipelineKey && Objects.equals(this.holder, poolKey.holder) && Objects.equals(this.fqdn, poolKey.fqdn);
        }

        public int hashCode() {
            return Objects.hash(this.holder, this.pipelineKey, this.fqdn);
        }
    }

    static final class DisposableAcquire
    implements ConnectionObserver,
    Runnable,
    CoreSubscriber<PooledRef<PooledConnection>>,
    Disposable {
        final Disposable.Composite cancellations;
        final MonoSink<Connection> sink;
        final InstrumentedPool<PooledConnection> pool;
        final ConnectionObserver obs;
        final ChannelOperations.OnSetup opsFactory;
        final long pendingAcquireTimeout;
        final boolean retried;
        PooledRef<PooledConnection> pooledRef;
        Subscription subscription;

        DisposableAcquire(MonoSink<Connection> sink, InstrumentedPool<PooledConnection> pool, ConnectionObserver obs, ChannelOperations.OnSetup opsFactory, long pendingAcquireTimeout, boolean retried) {
            this.cancellations = Disposables.composite();
            this.pool = pool;
            this.sink = sink;
            this.obs = obs;
            this.opsFactory = opsFactory;
            this.pendingAcquireTimeout = pendingAcquireTimeout;
            this.retried = retried;
        }

        DisposableAcquire(DisposableAcquire parent) {
            this.cancellations = parent.cancellations;
            this.sink = parent.sink;
            this.pool = parent.pool;
            this.obs = parent.obs;
            this.opsFactory = parent.opsFactory;
            this.pendingAcquireTimeout = parent.pendingAcquireTimeout;
            this.retried = true;
        }

        public void onNext(PooledRef<PooledConnection> value) {
            this.pooledRef = value;
            PooledConnection pooledConnection = value.poolable();
            pooledConnection.pooledRef = this.pooledRef;
            Channel c = pooledConnection.channel;
            if (c.eventLoop().inEventLoop()) {
                this.run();
            } else {
                c.eventLoop().execute((Runnable)this);
            }
        }

        public void dispose() {
            this.subscription.cancel();
        }

        public void onError(Throwable throwable) {
            this.sink.error(throwable);
        }

        public void onSubscribe(Subscription s) {
            if (Operators.validate((Subscription)this.subscription, (Subscription)s)) {
                this.subscription = s;
                this.cancellations.add((Disposable)this);
                if (!this.retried) {
                    this.sink.onCancel((Disposable)this.cancellations);
                }
                s.request(Long.MAX_VALUE);
            }
        }

        public void onComplete() {
        }

        @Override
        public Context currentContext() {
            return this.sink.currentContext();
        }

        @Override
        public void onUncaughtException(Connection connection, Throwable error) {
            this.sink.error(error);
            this.obs.onUncaughtException(connection, error);
        }

        @Override
        public void onStateChange(Connection connection, ConnectionObserver.State newState) {
            if (newState == ConnectionObserver.State.CONFIGURED) {
                this.sink.success((Object)connection);
            }
            this.obs.onStateChange(connection, newState);
        }

        @Override
        public void run() {
            PooledConnection pooledConnection = this.pooledRef.poolable();
            Channel c = pooledConnection.channel;
            if (!c.isActive()) {
                this.pooledRef.invalidate().subscribe(null, null, () -> {
                    if (log.isDebugEnabled()) {
                        log.debug(ReactorNetty.format(c, "Channel closed, now {} active connections and {} inactive connections"), new Object[]{this.pool.metrics().acquiredSize(), this.pool.metrics().idleSize()});
                    }
                });
                if (!this.retried) {
                    if (log.isDebugEnabled()) {
                        log.debug(ReactorNetty.format(c, "Immediately aborted pooled channel, re-acquiring new channel"));
                    }
                    PooledConnectionProvider.disposableAcquire(new DisposableAcquire(this));
                } else {
                    this.sink.error((Throwable)new IOException("Error while acquiring from " + this.pool));
                }
                return;
            }
            ConnectionObserver current = (ConnectionObserver)c.attr(OWNER).getAndSet((Object)this);
            if (current instanceof PendingConnectionObserver) {
                PendingConnectionObserver.Pending p;
                PendingConnectionObserver pending = (PendingConnectionObserver)current;
                current = null;
                this.registerClose(this.pooledRef, this.pool);
                while ((p = pending.pendingQueue.poll()) != null) {
                    if (p.error != null) {
                        this.onUncaughtException(p.connection, p.error);
                        continue;
                    }
                    if (p.state == null) continue;
                    this.onStateChange(p.connection, p.state);
                }
            } else if (current == null) {
                this.registerClose(this.pooledRef, this.pool);
            }
            if (current != null) {
                if (log.isDebugEnabled()) {
                    log.debug(ReactorNetty.format(c, "Channel acquired, now {} active connections and {} inactive connections"), new Object[]{this.pool.metrics().acquiredSize(), this.pool.metrics().idleSize()});
                }
                this.obs.onStateChange(pooledConnection, ConnectionObserver.State.ACQUIRED);
                ChannelOperations<?, ?> ops = this.opsFactory.create(pooledConnection, pooledConnection, null);
                if (ops != null) {
                    ops.bind();
                    this.sink.success(ops);
                    this.obs.onStateChange(ops, ConnectionObserver.State.CONFIGURED);
                } else {
                    this.sink.success((Object)pooledConnection);
                }
                return;
            }
            if (log.isDebugEnabled()) {
                log.debug(ReactorNetty.format(c, "Channel connected, now {} active connections and {} inactive connections"), new Object[]{this.pool.metrics().acquiredSize(), this.pool.metrics().idleSize()});
            }
            if (this.opsFactory == ChannelOperations.OnSetup.empty()) {
                this.sink.success((Object)Connection.from(c));
            }
        }

        void registerClose(PooledRef<PooledConnection> pooledRef, InstrumentedPool<PooledConnection> pool) {
            Channel channel = pooledRef.poolable().channel;
            if (log.isDebugEnabled()) {
                log.debug(ReactorNetty.format(channel, "Registering pool release on close event for channel"));
            }
            channel.closeFuture().addListener(ff -> {
                ConnectionObserver owner = (ConnectionObserver)channel.attr(OWNER).get();
                if (owner instanceof DisposableAcquire) {
                    ((DisposableAcquire)owner).pooledRef.invalidate().subscribe(null, null, () -> {
                        if (log.isDebugEnabled()) {
                            log.debug(ReactorNetty.format(channel, "Channel closed, now {} active connections and {} inactive connections"), new Object[]{pool.metrics().acquiredSize(), pool.metrics().idleSize()});
                        }
                    });
                }
            });
        }
    }

    static final class PooledConnection
    implements Connection,
    ConnectionObserver {
        final Channel channel;
        final InstrumentedPool<PooledConnection> pool;
        final MonoProcessor<Void> onTerminate;
        PooledRef<PooledConnection> pooledRef;

        PooledConnection(Channel channel, InstrumentedPool<PooledConnection> pool) {
            this.channel = channel;
            this.pool = pool;
            this.onTerminate = MonoProcessor.create();
        }

        ConnectionObserver owner() {
            ConnectionObserver obs;
            do {
                if ((obs = (ConnectionObserver)this.channel.attr(OWNER).get()) != null) {
                    return obs;
                }
                obs = new PendingConnectionObserver();
            } while (!this.channel.attr(OWNER).compareAndSet(null, (Object)obs));
            return obs;
        }

        @Override
        public Mono<Void> onTerminate() {
            return this.onTerminate.or(this.onDispose());
        }

        @Override
        public Channel channel() {
            return this.channel;
        }

        @Override
        public Context currentContext() {
            return this.owner().currentContext();
        }

        @Override
        public void onUncaughtException(Connection connection, Throwable error) {
            this.owner().onUncaughtException(connection, error);
        }

        @Override
        public void onStateChange(Connection connection, ConnectionObserver.State newState) {
            if (log.isDebugEnabled()) {
                log.debug(ReactorNetty.format(connection.channel(), "onStateChange({}, {})"), new Object[]{connection, newState});
            }
            if (newState == ConnectionObserver.State.DISCONNECTING) {
                if (!this.isPersistent() && this.channel.isActive()) {
                    this.channel.close();
                    this.owner().onStateChange(connection, ConnectionObserver.State.DISCONNECTING);
                    return;
                }
                if (!this.channel.isActive()) {
                    this.owner().onStateChange(connection, ConnectionObserver.State.DISCONNECTING);
                    return;
                }
                if (log.isDebugEnabled()) {
                    log.debug(ReactorNetty.format(connection.channel(), "Releasing channel"));
                }
                ConnectionObserver obs = (ConnectionObserver)this.channel.attr(OWNER).getAndSet((Object)ConnectionObserver.emptyListener());
                if (this.pooledRef == null) {
                    return;
                }
                this.pooledRef.release().subscribe(null, t -> {
                    if (log.isDebugEnabled()) {
                        log.debug("Failed cleaning the channel from pool, now {} active connections and {} inactive connections", new Object[]{this.pool.metrics().acquiredSize(), this.pool.metrics().idleSize(), t});
                    }
                    this.onTerminate.onComplete();
                    obs.onStateChange(connection, ConnectionObserver.State.RELEASED);
                }, () -> {
                    if (log.isDebugEnabled()) {
                        log.debug(ReactorNetty.format(this.pooledRef.poolable().channel, "Channel cleaned, now {} active connections and {} inactive connections"), new Object[]{this.pool.metrics().acquiredSize(), this.pool.metrics().idleSize()});
                    }
                    this.onTerminate.onComplete();
                    obs.onStateChange(connection, ConnectionObserver.State.RELEASED);
                });
                return;
            }
            this.owner().onStateChange(connection, newState);
        }

        public String toString() {
            return "PooledConnection{channel=" + this.channel + '}';
        }
    }

    static final class PendingConnectionObserver
    implements ConnectionObserver {
        final Queue<Pending> pendingQueue = (Queue)Queues.unbounded((int)4).get();

        PendingConnectionObserver() {
        }

        @Override
        public void onUncaughtException(Connection connection, Throwable error) {
            this.pendingQueue.add(new Pending(connection, error, null));
        }

        @Override
        public void onStateChange(Connection connection, ConnectionObserver.State newState) {
            this.pendingQueue.add(new Pending(connection, null, newState));
        }

        static class Pending {
            final Connection connection;
            final Throwable error;
            final ConnectionObserver.State state;

            Pending(Connection connection, @Nullable Throwable error, @Nullable ConnectionObserver.State state) {
                this.connection = connection;
                this.error = error;
                this.state = state;
            }
        }
    }

    static final class PooledConnectionAllocator {
        final InstrumentedPool<PooledConnection> pool;
        final Bootstrap bootstrap;
        final ChannelOperations.OnSetup opsFactory;

        PooledConnectionAllocator(Bootstrap b, PoolFactory provider, ChannelOperations.OnSetup opsFactory) {
            this.bootstrap = b.clone();
            this.opsFactory = opsFactory;
            this.pool = provider.newPool(this.connectChannel());
        }

        Publisher<PooledConnection> connectChannel() {
            return Mono.create(sink -> {
                Bootstrap b = this.bootstrap.clone();
                PooledConnectionInitializer initializer = new PooledConnectionInitializer((MonoSink<PooledConnection>)sink);
                b.handler((ChannelHandler)initializer);
                ChannelFuture f = b.connect();
                if (f.isDone()) {
                    initializer.operationComplete(f);
                } else {
                    f.addListener((GenericFutureListener)initializer);
                }
            });
        }

        final class PooledConnectionInitializer
        implements ChannelHandler,
        ChannelFutureListener {
            final MonoSink<PooledConnection> sink;
            PooledConnection pooledConnection;

            PooledConnectionInitializer(MonoSink<PooledConnection> sink) {
                this.sink = sink;
            }

            public void handlerAdded(ChannelHandlerContext ctx) {
                PooledConnection pooledConnection;
                Channel ch = ctx.channel();
                if (log.isDebugEnabled()) {
                    log.debug(ReactorNetty.format(ch, "Created a new pooled channel, now {} active connections and {} inactive connections"), new Object[]{PooledConnectionAllocator.this.pool.metrics().acquiredSize(), PooledConnectionAllocator.this.pool.metrics().idleSize()});
                }
                this.pooledConnection = pooledConnection = new PooledConnection(ch, PooledConnectionAllocator.this.pool);
                pooledConnection.bind();
                Bootstrap b = PooledConnectionAllocator.this.bootstrap.clone();
                BootstrapHandlers.finalizeHandler(b, PooledConnectionAllocator.this.opsFactory, (ConnectionObserver)pooledConnection);
                ch.pipeline().addFirst(new ChannelHandler[]{b.config().handler()});
                ctx.pipeline().remove((ChannelHandler)this);
            }

            public void handlerRemoved(ChannelHandlerContext ctx) {
            }

            public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
                ctx.pipeline().remove((ChannelHandler)this);
            }

            public void operationComplete(ChannelFuture future) {
                if (future.isSuccess()) {
                    this.sink.success((Object)this.pooledConnection);
                } else {
                    this.sink.error(future.cause());
                }
            }
        }
    }
}

