package pl.allegro.tech.hermes.management.config.kafka;

import java.time.Clock;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors;
import javax.annotation.PreDestroy;
import kafka.zk.AdminZkClient;
import kafka.zk.KafkaZkClient;
import kafka.zookeeper.ZooKeeperClient;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.common.utils.Time;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import pl.allegro.tech.hermes.common.broker.BrokerStorage;
import pl.allegro.tech.hermes.common.broker.ZookeeperBrokerStorage;
import pl.allegro.tech.hermes.common.kafka.KafkaConsumerPool;
import pl.allegro.tech.hermes.common.kafka.KafkaConsumerPoolConfig;
import pl.allegro.tech.hermes.common.kafka.KafkaNamesMapper;
import pl.allegro.tech.hermes.common.kafka.offset.SubscriptionOffsetChangeIndicator;
import pl.allegro.tech.hermes.common.message.wrapper.MessageContentWrapper;
import pl.allegro.tech.hermes.management.config.SubscriptionProperties;
import pl.allegro.tech.hermes.management.config.TopicProperties;
import pl.allegro.tech.hermes.management.domain.dc.DatacenterBoundRepositoryHolder;
import pl.allegro.tech.hermes.management.domain.dc.MultiDatacenterRepositoryCommandExecutor;
import pl.allegro.tech.hermes.management.infrastructure.kafka.MultiDCAwareService;
import pl.allegro.tech.hermes.management.infrastructure.kafka.service.BrokersClusterService;
import pl.allegro.tech.hermes.management.infrastructure.kafka.service.KafkaBrokerTopicManagement;
import pl.allegro.tech.hermes.management.infrastructure.kafka.service.KafkaRawMessageReader;
import pl.allegro.tech.hermes.management.infrastructure.kafka.service.KafkaSingleMessageReader;
import pl.allegro.tech.hermes.management.infrastructure.kafka.service.LogEndOffsetChecker;
import pl.allegro.tech.hermes.management.infrastructure.kafka.service.OffsetsAvailableChecker;
import pl.allegro.tech.hermes.management.infrastructure.kafka.service.retransmit.KafkaRetransmissionService;
import pl.allegro.tech.hermes.management.infrastructure.zookeeper.ZookeeperRepositoryManager;
import pl.allegro.tech.hermes.schema.SchemaRepository;
import tech.allegro.schema.json2avro.converter.JsonAvroConverter;

@EnableConfigurationProperties({KafkaClustersProperties.class})
@Configuration
/* loaded from: input_file:pl/allegro/tech/hermes/management/config/kafka/KafkaConfiguration.class */
public class KafkaConfiguration implements MultipleDcKafkaNamesMappersFactory {
    private static final String ZOOKEEPER_METRIC_GROUP = "zookeeper-metrics-group";
    private static final String ZOOKEEPER_METRIC_TYPE = "zookeeper";

    @Autowired
    KafkaClustersProperties kafkaClustersProperties;

    @Autowired
    TopicProperties topicProperties;

    @Autowired
    SubscriptionProperties subscriptionProperties;

    @Autowired
    MessageContentWrapper messageContentWrapper;

    @Autowired
    ZookeeperRepositoryManager zookeeperRepositoryManager;

    @Autowired
    MultiDatacenterRepositoryCommandExecutor multiDcExecutor;
    private final List<ZooKeeperClient> zkClients = new ArrayList();
    private final List<CuratorFramework> curators = new ArrayList();

    @Bean
    MultiDCAwareService multiDCAwareService(KafkaNamesMappers kafkaNamesMappers, SchemaRepository schemaRepository, Clock clock) {
        List repositories = this.zookeeperRepositoryManager.getRepositories(SubscriptionOffsetChangeIndicator.class);
        return new MultiDCAwareService((List) this.kafkaClustersProperties.getClusters().stream().map(kafkaProperties -> {
            KafkaNamesMapper mapper = kafkaNamesMappers.getMapper(kafkaProperties.getQualifiedClusterName());
            KafkaZkClient kafkaZkClient = kafkaZkClient(zooKeeperClient(kafkaProperties));
            AdminZkClient adminZkClient = adminZkClient(kafkaZkClient);
            AdminClient brokerAdminClient = brokerAdminClient(kafkaProperties);
            BrokerStorage brokersStorage = brokersStorage(curatorFramework(kafkaProperties), kafkaZkClient);
            KafkaBrokerTopicManagement kafkaBrokerTopicManagement = new KafkaBrokerTopicManagement(this.topicProperties, adminZkClient, kafkaZkClient, mapper);
            KafkaConsumerPool kafkaConsumersPool = kafkaConsumersPool(kafkaProperties, brokersStorage);
            KafkaRawMessageReader kafkaRawMessageReader = new KafkaRawMessageReader(kafkaConsumersPool, kafkaProperties.getKafkaConsumer().getPollTimeoutMillis());
            KafkaRetransmissionService kafkaRetransmissionService = new KafkaRetransmissionService(brokersStorage, getRepository(repositories, kafkaProperties), kafkaConsumersPool, mapper);
            return new BrokersClusterService(kafkaProperties.getQualifiedClusterName(), new KafkaSingleMessageReader(kafkaRawMessageReader, schemaRepository, new JsonAvroConverter()), kafkaRetransmissionService, kafkaBrokerTopicManagement, mapper, new OffsetsAvailableChecker(kafkaConsumersPool, brokersStorage), new LogEndOffsetChecker(kafkaConsumersPool), brokerAdminClient);
        }).collect(Collectors.toList()), clock, Duration.ofMillis(this.subscriptionProperties.getIntervalBetweenCheckinIfOffsetsMovedInMillis()), Duration.ofSeconds(this.subscriptionProperties.getOffsetsMovedTimeoutInSeconds()), this.multiDcExecutor);
    }

    private SubscriptionOffsetChangeIndicator getRepository(List<DatacenterBoundRepositoryHolder<SubscriptionOffsetChangeIndicator>> list, KafkaProperties kafkaProperties) {
        return list.size() == 1 ? list.get(0).getRepository() : list.stream().filter(datacenterBoundRepositoryHolder -> {
            return kafkaProperties.getDatacenter().equals(datacenterBoundRepositoryHolder.getDatacenterName());
        }).findFirst().orElseThrow(() -> {
            return new IllegalArgumentException(String.format("Kafka cluster dc name '%s' not matched with Zookeeper dc names: %s", kafkaProperties.getDatacenter(), list.stream().map(datacenterBoundRepositoryHolder2 -> {
                return datacenterBoundRepositoryHolder2.getDatacenterName();
            }).collect(Collectors.joining(","))));
        }).getRepository();
    }

    @ConditionalOnMissingBean
    @Bean
    KafkaNamesMappers kafkaNameMappers() {
        return createDefaultKafkaNamesMapper(this.kafkaClustersProperties);
    }

    @PreDestroy
    public void shutdown() {
        this.curators.forEach((v0) -> {
            v0.close();
        });
        this.zkClients.forEach((v0) -> {
            v0.close();
        });
    }

    private AdminZkClient adminZkClient(KafkaZkClient kafkaZkClient) {
        return new AdminZkClient(kafkaZkClient);
    }

    private KafkaZkClient kafkaZkClient(ZooKeeperClient zooKeeperClient) {
        return new KafkaZkClient(zooKeeperClient, false, Time.SYSTEM);
    }

    private ZooKeeperClient zooKeeperClient(KafkaProperties kafkaProperties) {
        ZooKeeperClient zooKeeperClient = new ZooKeeperClient(kafkaProperties.getConnectionString(), kafkaProperties.getSessionTimeoutMillis(), kafkaProperties.getConnectionTimeoutMillis(), kafkaProperties.getMaxInflight(), Time.SYSTEM, ZOOKEEPER_METRIC_GROUP, ZOOKEEPER_METRIC_TYPE);
        this.zkClients.add(zooKeeperClient);
        zooKeeperClient.waitUntilConnected();
        return zooKeeperClient;
    }

    private CuratorFramework curatorFramework(KafkaProperties kafkaProperties) {
        CuratorFramework newClient = CuratorFrameworkFactory.newClient(kafkaProperties.getConnectionString(), new RetryNTimes(kafkaProperties.getRetryTimes(), kafkaProperties.getRetrySleepMillis()));
        newClient.start();
        this.curators.add(newClient);
        return newClient;
    }

    private BrokerStorage brokersStorage(CuratorFramework curatorFramework, KafkaZkClient kafkaZkClient) {
        return new ZookeeperBrokerStorage(curatorFramework, kafkaZkClient);
    }

    private KafkaConsumerPool kafkaConsumersPool(KafkaProperties kafkaProperties, BrokerStorage brokerStorage) {
        return new KafkaConsumerPool(new KafkaConsumerPoolConfig(kafkaProperties.getKafkaConsumer().getCacheExpirationSeconds(), kafkaProperties.getKafkaConsumer().getBufferSizeBytes(), kafkaProperties.getKafkaConsumer().getFetchMaxWaitMillis(), kafkaProperties.getKafkaConsumer().getFetchMinBytes(), kafkaProperties.getKafkaConsumer().getNamePrefix(), kafkaProperties.getKafkaConsumer().getConsumerGroupName()), brokerStorage);
    }

    private AdminClient brokerAdminClient(KafkaProperties kafkaProperties) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", kafkaProperties.getBootstrapKafkaServer());
        properties.put("request.timeout.ms", Integer.valueOf(kafkaProperties.getKafkaServerRequestTimeoutMillis()));
        return AdminClient.create(properties);
    }
}
