/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.kafka.pubsub;

import java.io.Closeable;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processors.kafka.pubsub.PublisherLease;

public class PublisherPool
implements Closeable {
    private final ComponentLog logger;
    private final BlockingQueue<PublisherLease> publisherQueue;
    private final Map<String, Object> kafkaProperties;
    private final int maxMessageSize;
    private final long maxAckWaitMillis;
    private volatile boolean closed = false;

    PublisherPool(Map<String, Object> kafkaProperties, ComponentLog logger, int maxMessageSize, long maxAckWaitMillis) {
        this.logger = logger;
        this.publisherQueue = new LinkedBlockingQueue<PublisherLease>();
        this.kafkaProperties = kafkaProperties;
        this.maxMessageSize = maxMessageSize;
        this.maxAckWaitMillis = maxAckWaitMillis;
    }

    public PublisherLease obtainPublisher() {
        if (this.isClosed()) {
            throw new IllegalStateException("Connection Pool is closed");
        }
        PublisherLease lease = (PublisherLease)this.publisherQueue.poll();
        if (lease != null) {
            return lease;
        }
        lease = this.createLease();
        return lease;
    }

    private PublisherLease createLease() {
        KafkaProducer producer = new KafkaProducer(this.kafkaProperties);
        PublisherLease lease = new PublisherLease((Producer)producer, this.maxMessageSize, this.maxAckWaitMillis, this.logger){

            @Override
            public void close() {
                if (this.isPoisoned() || PublisherPool.this.isClosed()) {
                    super.close();
                } else {
                    PublisherPool.this.publisherQueue.offer(this);
                }
            }
        };
        return lease;
    }

    public synchronized boolean isClosed() {
        return this.closed;
    }

    @Override
    public synchronized void close() {
        PublisherLease lease;
        this.closed = true;
        while ((lease = (PublisherLease)this.publisherQueue.poll()) != null) {
            lease.close();
        }
    }

    protected int available() {
        return this.publisherQueue.size();
    }
}

