package pl.allegro.tech.hermes.management.config.kafka;

import java.time.Clock;
import java.time.Duration;
import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.AdminClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import pl.allegro.tech.hermes.common.broker.BrokerStorage;
import pl.allegro.tech.hermes.common.broker.KafkaBrokerStorage;
import pl.allegro.tech.hermes.common.kafka.KafkaConsumerPool;
import pl.allegro.tech.hermes.common.kafka.KafkaConsumerPoolConfig;
import pl.allegro.tech.hermes.common.kafka.KafkaNamesMapper;
import pl.allegro.tech.hermes.common.kafka.offset.SubscriptionOffsetChangeIndicator;
import pl.allegro.tech.hermes.common.message.wrapper.MessageContentWrapper;
import pl.allegro.tech.hermes.management.config.SubscriptionProperties;
import pl.allegro.tech.hermes.management.config.TopicProperties;
import pl.allegro.tech.hermes.management.domain.dc.DatacenterBoundRepositoryHolder;
import pl.allegro.tech.hermes.management.domain.dc.MultiDatacenterRepositoryCommandExecutor;
import pl.allegro.tech.hermes.management.domain.subscription.ConsumerGroupManager;
import pl.allegro.tech.hermes.management.infrastructure.kafka.MultiDCAwareService;
import pl.allegro.tech.hermes.management.infrastructure.kafka.service.BrokersClusterService;
import pl.allegro.tech.hermes.management.infrastructure.kafka.service.KafkaBrokerTopicManagement;
import pl.allegro.tech.hermes.management.infrastructure.kafka.service.KafkaConsumerGroupManager;
import pl.allegro.tech.hermes.management.infrastructure.kafka.service.KafkaRawMessageReader;
import pl.allegro.tech.hermes.management.infrastructure.kafka.service.KafkaSingleMessageReader;
import pl.allegro.tech.hermes.management.infrastructure.kafka.service.LogEndOffsetChecker;
import pl.allegro.tech.hermes.management.infrastructure.kafka.service.NoOpConsumerGroupManager;
import pl.allegro.tech.hermes.management.infrastructure.kafka.service.OffsetsAvailableChecker;
import pl.allegro.tech.hermes.management.infrastructure.kafka.service.retransmit.KafkaRetransmissionService;
import pl.allegro.tech.hermes.management.infrastructure.zookeeper.ZookeeperRepositoryManager;
import pl.allegro.tech.hermes.schema.SchemaRepository;
import tech.allegro.schema.json2avro.converter.JsonAvroConverter;

@EnableConfigurationProperties({KafkaClustersProperties.class})
@Configuration
/* loaded from: input_file:pl/allegro/tech/hermes/management/config/kafka/KafkaConfiguration.class */
public class KafkaConfiguration implements MultipleDcKafkaNamesMappersFactory {

    @Autowired
    KafkaClustersProperties kafkaClustersProperties;

    @Autowired
    TopicProperties topicProperties;

    @Autowired
    SubscriptionProperties subscriptionProperties;

    @Autowired
    MessageContentWrapper messageContentWrapper;

    @Autowired
    ZookeeperRepositoryManager zookeeperRepositoryManager;

    @Autowired
    MultiDatacenterRepositoryCommandExecutor multiDcExecutor;

    @Bean
    MultiDCAwareService multiDCAwareService(KafkaNamesMappers kafkaNamesMappers, SchemaRepository schemaRepository, Clock clock, JsonAvroConverter jsonAvroConverter) {
        List repositories = this.zookeeperRepositoryManager.getRepositories(SubscriptionOffsetChangeIndicator.class);
        return new MultiDCAwareService((List) this.kafkaClustersProperties.getClusters().stream().map(kafkaProperties -> {
            KafkaNamesMapper mapper = kafkaNamesMappers.getMapper(kafkaProperties.getQualifiedClusterName());
            AdminClient brokerAdminClient = brokerAdminClient(kafkaProperties);
            BrokerStorage brokersStorage = brokersStorage(brokerAdminClient);
            KafkaBrokerTopicManagement kafkaBrokerTopicManagement = new KafkaBrokerTopicManagement(this.topicProperties, brokerAdminClient, mapper);
            KafkaConsumerPool kafkaConsumersPool = kafkaConsumersPool(kafkaProperties, brokersStorage, kafkaProperties.getBootstrapKafkaServer());
            KafkaRawMessageReader kafkaRawMessageReader = new KafkaRawMessageReader(kafkaConsumersPool, kafkaProperties.getKafkaConsumer().getPollTimeoutMillis());
            KafkaRetransmissionService kafkaRetransmissionService = new KafkaRetransmissionService(brokersStorage, getRepository(repositories, kafkaProperties), kafkaConsumersPool, mapper);
            return new BrokersClusterService(kafkaProperties.getQualifiedClusterName(), new KafkaSingleMessageReader(kafkaRawMessageReader, schemaRepository, jsonAvroConverter), kafkaRetransmissionService, kafkaBrokerTopicManagement, mapper, new OffsetsAvailableChecker(kafkaConsumersPool, brokersStorage), new LogEndOffsetChecker(kafkaConsumersPool), brokerAdminClient, createConsumerGroupManager(kafkaProperties, mapper));
        }).collect(Collectors.toList()), clock, Duration.ofMillis(this.subscriptionProperties.getIntervalBetweenCheckinIfOffsetsMovedInMillis()), Duration.ofSeconds(this.subscriptionProperties.getOffsetsMovedTimeoutInSeconds()), this.multiDcExecutor);
    }

    private ConsumerGroupManager createConsumerGroupManager(KafkaProperties kafkaProperties, KafkaNamesMapper kafkaNamesMapper) {
        return this.subscriptionProperties.isCreateConsumerGroupManuallyEnabled() ? new KafkaConsumerGroupManager(kafkaNamesMapper, kafkaProperties.getQualifiedClusterName(), kafkaProperties.getBootstrapKafkaServer(), kafkaProperties) : new NoOpConsumerGroupManager();
    }

    private SubscriptionOffsetChangeIndicator getRepository(List<DatacenterBoundRepositoryHolder<SubscriptionOffsetChangeIndicator>> list, KafkaProperties kafkaProperties) {
        return list.size() == 1 ? list.get(0).getRepository() : list.stream().filter(datacenterBoundRepositoryHolder -> {
            return kafkaProperties.getDatacenter().equals(datacenterBoundRepositoryHolder.getDatacenterName());
        }).findFirst().orElseThrow(() -> {
            return new IllegalArgumentException(String.format("Kafka cluster dc name '%s' not matched with Zookeeper dc names: %s", kafkaProperties.getDatacenter(), list.stream().map(datacenterBoundRepositoryHolder2 -> {
                return datacenterBoundRepositoryHolder2.getDatacenterName();
            }).collect(Collectors.joining(","))));
        }).getRepository();
    }

    @ConditionalOnMissingBean
    @Bean
    KafkaNamesMappers kafkaNameMappers() {
        return createDefaultKafkaNamesMapper(this.kafkaClustersProperties);
    }

    private BrokerStorage brokersStorage(AdminClient adminClient) {
        return new KafkaBrokerStorage(adminClient);
    }

    private KafkaConsumerPool kafkaConsumersPool(KafkaProperties kafkaProperties, BrokerStorage brokerStorage, String str) {
        return new KafkaConsumerPool(new KafkaConsumerPoolConfig(kafkaProperties.getKafkaConsumer().getCacheExpirationSeconds(), kafkaProperties.getKafkaConsumer().getBufferSizeBytes(), kafkaProperties.getKafkaConsumer().getFetchMaxWaitMillis(), kafkaProperties.getKafkaConsumer().getFetchMinBytes(), kafkaProperties.getKafkaConsumer().getNamePrefix(), kafkaProperties.getKafkaConsumer().getConsumerGroupName(), kafkaProperties.getSasl().isEnabled(), kafkaProperties.getSasl().getMechanism(), kafkaProperties.getSasl().getProtocol(), kafkaProperties.getSasl().getJaasConfig()), brokerStorage, str);
    }

    private AdminClient brokerAdminClient(KafkaProperties kafkaProperties) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", kafkaProperties.getBootstrapKafkaServer());
        properties.put("security.protocol", "PLAINTEXT");
        properties.put("request.timeout.ms", Integer.valueOf(kafkaProperties.getKafkaServerRequestTimeoutMillis()));
        if (kafkaProperties.getSasl().isEnabled()) {
            properties.put("sasl.mechanism", kafkaProperties.getSasl().getMechanism());
            properties.put("security.protocol", kafkaProperties.getSasl().getProtocol());
            properties.put("sasl.jaas.config", kafkaProperties.getSasl().getJaasConfig());
        }
        return AdminClient.create(properties);
    }
}
