package org.springframework.integration.kafka.core;

import com.gs.collections.api.block.function.Function;
import com.gs.collections.api.block.predicate.Predicate;
import com.gs.collections.api.partition.PartitionIterable;
import com.gs.collections.impl.block.factory.Functions;
import com.gs.collections.impl.map.mutable.UnifiedMap;
import com.gs.collections.impl.utility.Iterate;
import com.gs.collections.impl.utility.ListIterate;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import kafka.client.ClientUtils$;
import kafka.common.ErrorMapping;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.TopicMetadataResponse;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.util.Assert;
import scala.collection.JavaConversions;

/* loaded from: input_file:org/springframework/integration/kafka/core/DefaultConnectionFactory.class */
public class DefaultConnectionFactory implements InitializingBean, ConnectionFactory, DisposableBean {
    private static final Log log = LogFactory.getLog(DefaultConnectionFactory.class);
    public static final Predicate<TopicMetadata> errorlessTopicMetadataPredicate = new ErrorlessTopicMetadataPredicate();
    private final Configuration configuration;
    private final GetBrokersByPartitionFunction getBrokersByPartitionFunction = new GetBrokersByPartitionFunction();
    private final AtomicReference<MetadataCache> metadataCacheHolder = new AtomicReference<>(new MetadataCache(Collections.emptySet()));
    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    private final UnifiedMap<BrokerAddress, Connection> kafkaBrokersCache = UnifiedMap.newMap();

    /* loaded from: input_file:org/springframework/integration/kafka/core/DefaultConnectionFactory$ErrorlessTopicMetadataPredicate.class */
    private static class ErrorlessTopicMetadataPredicate implements Predicate<TopicMetadata> {
        private ErrorlessTopicMetadataPredicate() {
        }

        public boolean accept(TopicMetadata topicMetadata) {
            return topicMetadata.errorCode() == ErrorMapping.NoError();
        }
    }

    /* loaded from: input_file:org/springframework/integration/kafka/core/DefaultConnectionFactory$GetBrokersByPartitionFunction.class */
    private class GetBrokersByPartitionFunction implements Function<Partition, BrokerAddress> {
        private GetBrokersByPartitionFunction() {
        }

        public BrokerAddress valueOf(Partition partition) {
            return ((MetadataCache) DefaultConnectionFactory.this.metadataCacheHolder.get()).getLeader(partition);
        }
    }

    public DefaultConnectionFactory(Configuration configuration) {
        this.configuration = configuration;
    }

    public Configuration getConfiguration() {
        return this.configuration;
    }

    public void afterPropertiesSet() throws Exception {
        Assert.notNull(this.configuration, "Kafka configuration cannot be empty");
    }

    public void destroy() throws Exception {
        Iterator it = this.kafkaBrokersCache.iterator();
        while (it.hasNext()) {
            ((Connection) it.next()).close();
        }
    }

    @Override // org.springframework.integration.kafka.core.ConnectionFactory
    public Map<Partition, BrokerAddress> getLeaders(Iterable<Partition> iterable) {
        return Iterate.toMap(iterable, Functions.getPassThru(), this.getBrokersByPartitionFunction);
    }

    @Override // org.springframework.integration.kafka.core.ConnectionFactory
    public BrokerAddress getLeader(Partition partition) {
        try {
            this.lock.readLock().lock();
            BrokerAddress leader = getMetadataCache().getLeader(partition);
            this.lock.readLock().unlock();
            if (leader == null) {
                try {
                    this.lock.writeLock().lock();
                    leader = getMetadataCache().getLeader(partition);
                    if (leader == null) {
                        refreshMetadata(Collections.singleton(partition.getTopic()));
                        leader = getMetadataCache().getLeader(partition);
                    }
                } finally {
                    this.lock.writeLock().unlock();
                }
            }
            if (leader == null) {
                throw new PartitionNotFoundException(partition);
            }
            return leader;
        } catch (Throwable th) {
            this.lock.readLock().unlock();
            throw th;
        }
    }

    @Override // org.springframework.integration.kafka.core.ConnectionFactory
    public Connection connect(BrokerAddress brokerAddress) {
        try {
            this.lock.readLock().lock();
            Connection connection = (Connection) this.kafkaBrokersCache.get(brokerAddress);
            this.lock.readLock().unlock();
            if (connection == null) {
                try {
                    this.lock.writeLock().lock();
                    connection = (Connection) this.kafkaBrokersCache.get(brokerAddress);
                    if (connection == null) {
                        connection = new DefaultConnection(brokerAddress, this.configuration.getClientId(), this.configuration.getBufferSize(), this.configuration.getSocketTimeout(), this.configuration.getMinBytes(), this.configuration.getMaxWait());
                        this.kafkaBrokersCache.put(brokerAddress, connection);
                    }
                } finally {
                    this.lock.writeLock().unlock();
                }
            }
            return connection;
        } catch (Throwable th) {
            this.lock.readLock().unlock();
            throw th;
        }
    }

    @Override // org.springframework.integration.kafka.core.ConnectionFactory
    public void refreshMetadata(Collection<String> collection) {
        try {
            this.lock.writeLock().lock();
            PartitionIterable partition = Iterate.partition(new TopicMetadataResponse(ClientUtils$.MODULE$.fetchTopicMetadata(JavaConversions.asScalaSet(new HashSet(collection)), ClientUtils$.MODULE$.parseBrokerList(ListIterate.collect(this.configuration.getBrokerAddresses(), Functions.getToString()).makeString(",")), this.configuration.getClientId(), this.configuration.getFetchMetadataTimeout(), 0)).topicsMetadata(), errorlessTopicMetadataPredicate);
            this.metadataCacheHolder.set(this.metadataCacheHolder.get().merge(partition.getSelected()));
            if (log.isInfoEnabled()) {
                for (TopicMetadata topicMetadata : partition.getRejected()) {
                    log.info(String.format("No metadata could be retrieved for '%s'", topicMetadata.topic()), ErrorMapping.exceptionFor(topicMetadata.errorCode()));
                }
            }
        } finally {
            this.lock.writeLock().unlock();
        }
    }

    @Override // org.springframework.integration.kafka.core.ConnectionFactory
    public void disconnect(BrokerAddress brokerAddress) {
        try {
            this.lock.writeLock().lock();
            Connection connection = (Connection) this.kafkaBrokersCache.get(brokerAddress);
            if (connection != null) {
                connection.close();
            }
        } finally {
            this.lock.writeLock().unlock();
        }
    }

    @Override // org.springframework.integration.kafka.core.ConnectionFactory
    public Collection<Partition> getPartitions(String str) {
        try {
            this.lock.readLock().lock();
            Collection<Partition> partitions = getMetadataCache().getPartitions(str);
            this.lock.readLock().unlock();
            if (partitions == null) {
                try {
                    this.lock.writeLock().lock();
                    partitions = getMetadataCache().getPartitions(str);
                    if (partitions == null) {
                        refreshMetadata(Collections.singleton(str));
                        partitions = getMetadataCache().getPartitions(str);
                    }
                } finally {
                    this.lock.writeLock().unlock();
                }
            }
            if (partitions == null) {
                throw new TopicNotFoundException(str);
            }
            return partitions;
        } catch (Throwable th) {
            this.lock.readLock().unlock();
            throw th;
        }
    }

    private MetadataCache getMetadataCache() {
        return this.metadataCacheHolder.get();
    }
}
