/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.kafka.pubsub;

import java.io.Closeable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processors.kafka.pubsub.ConsumerLease;

public class ConsumerPool
implements Closeable {
    private final BlockingQueue<SimpleConsumerLease> pooledLeases;
    private final List<String> topics;
    private final Map<String, Object> kafkaProperties;
    private final long maxWaitMillis;
    private final ComponentLog logger;
    private final byte[] demarcatorBytes;
    private final String keyEncoding;
    private final String securityProtocol;
    private final String bootstrapServers;
    private final AtomicLong consumerCreatedCountRef = new AtomicLong();
    private final AtomicLong consumerClosedCountRef = new AtomicLong();
    private final AtomicLong leasesObtainedCountRef = new AtomicLong();

    public ConsumerPool(int maxConcurrentLeases, byte[] demarcator, Map<String, Object> kafkaProperties, List<String> topics, long maxWaitMillis, String keyEncoding, String securityProtocol, String bootstrapServers, ComponentLog logger) {
        this.pooledLeases = new ArrayBlockingQueue<SimpleConsumerLease>(maxConcurrentLeases);
        this.maxWaitMillis = maxWaitMillis;
        this.logger = logger;
        this.demarcatorBytes = demarcator;
        this.keyEncoding = keyEncoding;
        this.securityProtocol = securityProtocol;
        this.bootstrapServers = bootstrapServers;
        this.kafkaProperties = Collections.unmodifiableMap(kafkaProperties);
        this.topics = Collections.unmodifiableList(topics);
    }

    public ConsumerLease obtainConsumer(ProcessSession session) {
        SimpleConsumerLease lease = (SimpleConsumerLease)this.pooledLeases.poll();
        if (lease == null) {
            Consumer<byte[], byte[]> consumer = this.createKafkaConsumer();
            this.consumerCreatedCountRef.incrementAndGet();
            lease = new SimpleConsumerLease(consumer);
            consumer.subscribe(this.topics, (ConsumerRebalanceListener)lease);
        }
        lease.setProcessSession(session);
        this.leasesObtainedCountRef.incrementAndGet();
        return lease;
    }

    public void retainConsumers() {
        Arrays.stream(this.pooledLeases.toArray(new ConsumerLease[0])).forEach(ConsumerLease::retainConnection);
    }

    protected Consumer<byte[], byte[]> createKafkaConsumer() {
        return new KafkaConsumer(this.kafkaProperties);
    }

    @Override
    public void close() {
        ArrayList leases = new ArrayList();
        this.pooledLeases.drainTo(leases);
        leases.stream().forEach(lease -> lease.close(true));
    }

    private void closeConsumer(Consumer<?, ?> consumer) {
        this.consumerClosedCountRef.incrementAndGet();
        try {
            consumer.unsubscribe();
        }
        catch (Exception e) {
            this.logger.warn("Failed while unsubscribing " + consumer, (Throwable)e);
        }
        try {
            consumer.close();
        }
        catch (Exception e) {
            this.logger.warn("Failed while closing " + consumer, (Throwable)e);
        }
    }

    PoolStats getPoolStats() {
        return new PoolStats(this.consumerCreatedCountRef.get(), this.consumerClosedCountRef.get(), this.leasesObtainedCountRef.get());
    }

    static final class PoolStats {
        final long consumerCreatedCount;
        final long consumerClosedCount;
        final long leasesObtainedCount;

        PoolStats(long consumerCreatedCount, long consumerClosedCount, long leasesObtainedCount) {
            this.consumerCreatedCount = consumerCreatedCount;
            this.consumerClosedCount = consumerClosedCount;
            this.leasesObtainedCount = leasesObtainedCount;
        }

        public String toString() {
            return "Created Consumers [" + this.consumerCreatedCount + "]\nClosed Consumers  [" + this.consumerClosedCount + "]\nLeases Obtained   [" + this.leasesObtainedCount + "]\n";
        }
    }

    private class SimpleConsumerLease
    extends ConsumerLease {
        private final Consumer<byte[], byte[]> consumer;
        private volatile ProcessSession session;
        private volatile boolean closedConsumer;

        private SimpleConsumerLease(Consumer<byte[], byte[]> consumer) {
            super(ConsumerPool.this.maxWaitMillis, consumer, ConsumerPool.this.demarcatorBytes, ConsumerPool.this.keyEncoding, ConsumerPool.this.securityProtocol, ConsumerPool.this.bootstrapServers, ConsumerPool.this.logger);
            this.consumer = consumer;
        }

        void setProcessSession(ProcessSession session) {
            this.session = session;
        }

        @Override
        public ProcessSession getProcessSession() {
            return this.session;
        }

        @Override
        public void close() {
            super.close();
            this.close(false);
        }

        public void close(boolean forceClose) {
            if (this.closedConsumer) {
                return;
            }
            super.close();
            if (this.session != null) {
                this.session.rollback();
                this.setProcessSession(null);
            }
            if (forceClose || this.isPoisoned() || !ConsumerPool.this.pooledLeases.offer(this)) {
                this.closedConsumer = true;
                ConsumerPool.this.closeConsumer(this.consumer);
            }
        }
    }
}

