package me.prettyprint.cassandra.connection;

import java.util.HashSet;
import java.util.Iterator;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import me.prettyprint.cassandra.connection.client.HClient;
import me.prettyprint.cassandra.connection.factory.HClientFactory;
import me.prettyprint.cassandra.service.CassandraHost;
import me.prettyprint.hector.api.exceptions.HInactivePoolException;
import me.prettyprint.hector.api.exceptions.HPoolExhaustedException;
import me.prettyprint.hector.api.exceptions.HectorException;
import me.prettyprint.hector.api.exceptions.HectorTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:me/prettyprint/cassandra/connection/ConcurrentHClientPool.class */
public class ConcurrentHClientPool implements HClientPool {
    private static final Logger log = LoggerFactory.getLogger(ConcurrentHClientPool.class);
    private final ArrayBlockingQueue<HClient> availableClientQueue;
    private final CassandraHost cassandraHost;
    private final long maxWaitTimeWhenExhausted;
    private final HClientFactory clientFactory;
    private final AtomicInteger activeClientsCount = new AtomicInteger(0);
    private final AtomicInteger realActiveClientsCount = new AtomicInteger(0);
    private final AtomicInteger numBlocked = new AtomicInteger();
    private final AtomicBoolean active = new AtomicBoolean(true);

    public ConcurrentHClientPool(HClientFactory hClientFactory, CassandraHost cassandraHost) {
        this.clientFactory = hClientFactory;
        this.cassandraHost = cassandraHost;
        this.availableClientQueue = new ArrayBlockingQueue<>(this.cassandraHost.getMaxActive(), true);
        this.maxWaitTimeWhenExhausted = this.cassandraHost.getMaxWaitTimeWhenExhausted() < 0 ? 0L : this.cassandraHost.getMaxWaitTimeWhenExhausted();
        for (int i = 0; i < this.cassandraHost.getMaxActive() / 3; i++) {
            this.availableClientQueue.add(createClient());
        }
        if (log.isDebugEnabled()) {
            log.debug("Concurrent Host pool started with {} active clients; max: {} exhausted wait: {}", new Object[]{Integer.valueOf(getNumIdle()), Integer.valueOf(this.cassandraHost.getMaxActive()), Long.valueOf(this.maxWaitTimeWhenExhausted)});
        }
    }

    @Override // me.prettyprint.cassandra.connection.HClientPool
    public HClient borrowClient() throws HectorException {
        if (!this.active.get()) {
            throw new HInactivePoolException("Attempt to borrow on in-active pool: " + getName());
        }
        HClient poll = this.availableClientQueue.poll();
        int incrementAndGet = this.activeClientsCount.incrementAndGet();
        if (poll == null) {
            try {
                poll = incrementAndGet <= this.cassandraHost.getMaxActive() ? createClient() : waitForConnection();
            } catch (RuntimeException e) {
                this.activeClientsCount.decrementAndGet();
                throw e;
            }
        }
        if (poll == null) {
            throw new HectorException("HConnectionManager returned a null client after aquisition - are we shutting down?");
        }
        this.realActiveClientsCount.incrementAndGet();
        return poll;
    }

    private HClient waitForConnection() {
        HClient hClient = null;
        this.numBlocked.incrementAndGet();
        if (log.isDebugEnabled()) {
            log.debug("blocking on queue - current block count {}", Integer.valueOf(this.numBlocked.get()));
        }
        try {
            if (this.maxWaitTimeWhenExhausted != 0) {
                try {
                    hClient = this.availableClientQueue.poll(this.maxWaitTimeWhenExhausted, TimeUnit.MILLISECONDS);
                } catch (InterruptedException e) {
                    log.error("Cassandra client acquisition interrupted", e);
                }
                if (hClient == null) {
                    throw new HPoolExhaustedException(String.format("maxWaitTimeWhenExhausted exceeded for thread %s on host %s", Thread.currentThread().getName(), this.cassandraHost.getName()));
                }
                return hClient;
            }
            while (hClient == null && this.active.get()) {
                try {
                    hClient = this.availableClientQueue.poll(100L, TimeUnit.MILLISECONDS);
                } catch (InterruptedException e2) {
                    log.error("InterruptedException poll operation on retry forever", e2);
                }
            }
            return hClient;
        } finally {
            this.numBlocked.decrementAndGet();
        }
    }

    private HClient createClient() {
        return this.clientFactory.createClient(this.cassandraHost).open();
    }

    @Override // me.prettyprint.cassandra.connection.HClientPool
    public void shutdown() {
        if (!this.active.compareAndSet(true, false)) {
            throw new IllegalArgumentException("shutdown() called for inactive pool: " + getName());
        }
        log.info("Shutdown triggered on {}", getName());
        HashSet hashSet = new HashSet();
        this.availableClientQueue.drainTo(hashSet);
        if (hashSet.size() > 0) {
            Iterator it = hashSet.iterator();
            while (it.hasNext()) {
                ((HClient) it.next()).close();
            }
        }
        log.info("Shutdown complete on {}", getName());
    }

    @Override // me.prettyprint.cassandra.connection.HClientPool
    public CassandraHost getCassandraHost() {
        return this.cassandraHost;
    }

    @Override // me.prettyprint.cassandra.connection.PoolMetric
    public String getName() {
        return String.format("<ConcurrentCassandraClientPoolByHost>:{%s}", this.cassandraHost.getName());
    }

    @Override // me.prettyprint.cassandra.connection.PoolMetric
    public int getNumActive() {
        return this.realActiveClientsCount.get();
    }

    @Override // me.prettyprint.cassandra.connection.HClientPool
    public int getNumBeforeExhausted() {
        return this.cassandraHost.getMaxActive() - this.realActiveClientsCount.get();
    }

    @Override // me.prettyprint.cassandra.connection.PoolMetric
    public int getNumBlockedThreads() {
        return this.numBlocked.intValue();
    }

    @Override // me.prettyprint.cassandra.connection.PoolMetric
    public int getNumIdle() {
        return this.availableClientQueue.size();
    }

    @Override // me.prettyprint.cassandra.connection.HClientPool
    public boolean isExhausted() {
        return getNumBeforeExhausted() == 0;
    }

    @Override // me.prettyprint.cassandra.connection.HClientPool
    public int getMaxActive() {
        return this.cassandraHost.getMaxActive();
    }

    @Override // me.prettyprint.cassandra.connection.PoolMetric
    public boolean getIsActive() {
        return this.active.get();
    }

    @Override // me.prettyprint.cassandra.connection.HClientPool
    public String getStatusAsString() {
        return String.format("%s; IsActive?: %s; Active: %d; Blocked: %d; Idle: %d; NumBeforeExhausted: %d", getName(), Boolean.valueOf(getIsActive()), Integer.valueOf(getNumActive()), Integer.valueOf(getNumBlockedThreads()), Integer.valueOf(getNumIdle()), Integer.valueOf(getNumBeforeExhausted()));
    }

    @Override // me.prettyprint.cassandra.connection.HClientPool
    public void releaseClient(HClient hClient) throws HectorException {
        boolean isOpen = hClient.isOpen();
        if (!isOpen) {
            try {
                addClientToPoolGently(createClient());
            } catch (HectorTransportException e) {
                log.error("Transport exception in re-opening client in release on {}", getName());
            }
        } else if (this.active.get()) {
            addClientToPoolGently(hClient);
        } else {
            log.info("Open client {} released to in-active pool for host {}. Closing.", hClient, this.cassandraHost);
            hClient.close();
        }
        this.realActiveClientsCount.decrementAndGet();
        this.activeClientsCount.decrementAndGet();
        if (log.isDebugEnabled()) {
            log.debug("Status of releaseClient {} to queue: {}", hClient.toString(), Boolean.valueOf(isOpen));
        }
    }

    private void addClientToPoolGently(HClient hClient) {
        try {
            this.availableClientQueue.add(hClient);
        } catch (IllegalStateException e) {
            log.warn("Capacity hit adding client back to queue. Closing extra");
            hClient.close();
        }
    }
}
