package pl.allegro.tech.hermes.management.infrastructure.kafka.service.retransmit;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.TopicPartition;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.common.broker.BrokerStorage;
import pl.allegro.tech.hermes.common.kafka.KafkaConsumerPool;
import pl.allegro.tech.hermes.common.kafka.KafkaNamesMapper;
import pl.allegro.tech.hermes.common.kafka.KafkaTopic;
import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffset;
import pl.allegro.tech.hermes.common.kafka.offset.SubscriptionOffsetChangeIndicator;
import pl.allegro.tech.hermes.management.domain.message.RetransmissionService;

/* loaded from: input_file:pl/allegro/tech/hermes/management/infrastructure/kafka/service/retransmit/KafkaRetransmissionService.class */
public class KafkaRetransmissionService implements RetransmissionService {
    private final BrokerStorage brokerStorage;
    private final SubscriptionOffsetChangeIndicator subscriptionOffsetChange;
    private final KafkaConsumerPool consumerPool;
    private final KafkaNamesMapper kafkaNamesMapper;

    public KafkaRetransmissionService(BrokerStorage brokerStorage, SubscriptionOffsetChangeIndicator subscriptionOffsetChangeIndicator, KafkaConsumerPool kafkaConsumerPool, KafkaNamesMapper kafkaNamesMapper) {
        this.brokerStorage = brokerStorage;
        this.subscriptionOffsetChange = subscriptionOffsetChangeIndicator;
        this.consumerPool = kafkaConsumerPool;
        this.kafkaNamesMapper = kafkaNamesMapper;
    }

    @Override // pl.allegro.tech.hermes.management.domain.message.RetransmissionService
    public List<PartitionOffset> indicateOffsetChange(Topic topic, String str, String str2, long j, boolean z) {
        ArrayList arrayList = new ArrayList();
        this.kafkaNamesMapper.toKafkaTopics(topic).forEach(kafkaTopic -> {
            for (Integer num : this.brokerStorage.readPartitionsIds(kafkaTopic.name().asString())) {
                PartitionOffset partitionOffset = new PartitionOffset(kafkaTopic.name(), findClosestOffsetJustBeforeTimestamp(createKafkaConsumer(kafkaTopic, num.intValue()), kafkaTopic, num.intValue(), j), num.intValue());
                arrayList.add(partitionOffset);
                if (!z) {
                    this.subscriptionOffsetChange.setSubscriptionOffset(topic.getName(), str, str2, partitionOffset);
                }
            }
        });
        return arrayList;
    }

    @Override // pl.allegro.tech.hermes.management.domain.message.RetransmissionService
    public boolean areOffsetsMoved(Topic topic, String str, String str2) {
        return this.kafkaNamesMapper.toKafkaTopics(topic).allMatch(kafkaTopic -> {
            return Boolean.valueOf(this.subscriptionOffsetChange.areOffsetsMoved(topic.getName(), str, str2, kafkaTopic, this.brokerStorage.readPartitionsIds(kafkaTopic.name().asString())));
        });
    }

    private KafkaConsumer<byte[], byte[]> createKafkaConsumer(KafkaTopic kafkaTopic, int i) {
        return this.consumerPool.get(kafkaTopic, i);
    }

    private long findClosestOffsetJustBeforeTimestamp(KafkaConsumer<byte[], byte[]> kafkaConsumer, KafkaTopic kafkaTopic, int i, long j) {
        long endingOffset = getEndingOffset(kafkaConsumer, kafkaTopic, i);
        TopicPartition topicPartition = new TopicPartition(kafkaTopic.name().asString(), i);
        return ((OffsetAndTimestamp) Optional.ofNullable(kafkaConsumer.offsetsForTimes(Collections.singletonMap(topicPartition, Long.valueOf(j))).get(topicPartition)).orElse(new OffsetAndTimestamp(endingOffset, j))).offset();
    }

    private long getEndingOffset(KafkaConsumer<byte[], byte[]> kafkaConsumer, KafkaTopic kafkaTopic, int i) {
        TopicPartition topicPartition = new TopicPartition(kafkaTopic.name().asString(), i);
        return ((Long) Optional.ofNullable(kafkaConsumer.endOffsets(Collections.singleton(topicPartition)).get(topicPartition)).orElseThrow(() -> {
            return new OffsetNotFoundException(String.format("Ending offset for partition %s not found", topicPartition));
        })).longValue();
    }
}
