package pl.allegro.tech.hermes.management.config.kafka;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import javax.annotation.PreDestroy;
import kafka.utils.ZKStringSerializer$;
import org.I0Itec.zkclient.ZkClient;
import org.apache.avro.Schema;
import org.apache.commons.lang.StringUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import pl.allegro.tech.hermes.common.admin.AdminTool;
import pl.allegro.tech.hermes.common.broker.BrokerStorage;
import pl.allegro.tech.hermes.common.broker.ZookeeperBrokerStorage;
import pl.allegro.tech.hermes.common.kafka.KafkaNamesMapper;
import pl.allegro.tech.hermes.common.kafka.SimpleConsumerPool;
import pl.allegro.tech.hermes.common.kafka.SimpleConsumerPoolConfig;
import pl.allegro.tech.hermes.common.kafka.offset.SubscriptionOffsetChangeIndicator;
import pl.allegro.tech.hermes.common.message.wrapper.MessageContentWrapper;
import pl.allegro.tech.hermes.domain.topic.schema.SchemaRepository;
import pl.allegro.tech.hermes.management.config.TopicProperties;
import pl.allegro.tech.hermes.management.infrastructure.kafka.MultiDCAwareService;
import pl.allegro.tech.hermes.management.infrastructure.kafka.service.BrokersClusterService;
import pl.allegro.tech.hermes.management.infrastructure.kafka.service.KafkaBrokerTopicManagement;
import pl.allegro.tech.hermes.management.infrastructure.kafka.service.KafkaRawMessageReader;
import pl.allegro.tech.hermes.management.infrastructure.kafka.service.KafkaSingleMessageReader;
import pl.allegro.tech.hermes.management.infrastructure.kafka.service.OffsetsAvailableChecker;
import pl.allegro.tech.hermes.management.infrastructure.kafka.service.retransmit.KafkaRetransmissionService;

@EnableConfigurationProperties({KafkaClustersProperties.class})
@Configuration
/* loaded from: input_file:pl/allegro/tech/hermes/management/config/kafka/KafkaConfiguration.class */
public class KafkaConfiguration {

    @Autowired
    KafkaClustersProperties kafkaClustersProperties;

    @Autowired
    TopicProperties topicProperties;

    @Autowired
    ObjectMapper mapper;

    @Autowired
    MessageContentWrapper messageContentWrapper;

    @Autowired
    SubscriptionOffsetChangeIndicator subscriptionOffsetChangeIndicator;

    @Autowired
    AdminTool adminTool;

    @Autowired
    SchemaRepository<Schema> avroSchemaRepository;
    private final List<ZkClient> zkClients = new ArrayList();
    private final List<CuratorFramework> curators = new ArrayList();

    @Bean
    MultiDCAwareService multiDCAwareService() {
        return new MultiDCAwareService((List) this.kafkaClustersProperties.getClusters().stream().map(kafkaProperties -> {
            KafkaNamesMapper kafkaNamesMapper = StringUtils.isEmpty(kafkaProperties.getNamespace()) ? new KafkaNamesMapper(this.kafkaClustersProperties.getDefaultNamespace()) : new KafkaNamesMapper(kafkaProperties.getNamespace());
            BrokerStorage brokersStorage = brokersStorage(curatorFramework(kafkaProperties));
            KafkaBrokerTopicManagement kafkaBrokerTopicManagement = new KafkaBrokerTopicManagement(this.topicProperties, zkClient(kafkaProperties), kafkaNamesMapper);
            SimpleConsumerPool simpleConsumersPool = simpleConsumersPool(kafkaProperties, brokersStorage);
            KafkaRawMessageReader kafkaRawMessageReader = new KafkaRawMessageReader(simpleConsumersPool);
            return new BrokersClusterService(kafkaProperties.getClusterName(), new KafkaSingleMessageReader(kafkaRawMessageReader, this.avroSchemaRepository), new KafkaRetransmissionService(brokersStorage, kafkaRawMessageReader, this.messageContentWrapper, this.subscriptionOffsetChangeIndicator, simpleConsumersPool, kafkaNamesMapper), kafkaBrokerTopicManagement, kafkaNamesMapper, new OffsetsAvailableChecker(simpleConsumersPool, brokersStorage));
        }).collect(Collectors.toList()), this.adminTool);
    }

    @PreDestroy
    public void shutdown() {
        this.curators.forEach((v0) -> {
            v0.close();
        });
        this.zkClients.forEach((v0) -> {
            v0.close();
        });
    }

    private ZkClient zkClient(KafkaProperties kafkaProperties) {
        ZkClient zkClient = new ZkClient(kafkaProperties.getConnectionString(), kafkaProperties.getSessionTimeout(), kafkaProperties.getConnectionTimeout(), ZKStringSerializer$.MODULE$);
        zkClient.waitUntilConnected();
        this.zkClients.add(zkClient);
        return zkClient;
    }

    private CuratorFramework curatorFramework(KafkaProperties kafkaProperties) {
        CuratorFramework newClient = CuratorFrameworkFactory.newClient(kafkaProperties.getConnectionString(), new RetryNTimes(kafkaProperties.getRetryTimes(), kafkaProperties.getRetrySleep()));
        newClient.start();
        this.curators.add(newClient);
        return newClient;
    }

    private BrokerStorage brokersStorage(CuratorFramework curatorFramework) {
        return new ZookeeperBrokerStorage(curatorFramework, this.mapper);
    }

    private SimpleConsumerPool simpleConsumersPool(KafkaProperties kafkaProperties, BrokerStorage brokerStorage) {
        return new SimpleConsumerPool(new SimpleConsumerPoolConfig(kafkaProperties.getSimpleConsumer().getCacheExpiration(), kafkaProperties.getSimpleConsumer().getTimeout(), kafkaProperties.getSimpleConsumer().getBufferSize(), kafkaProperties.getSimpleConsumer().getNamePrefix()), brokerStorage);
    }
}
