package pl.allegro.tech.hermes.management.infrastructure.kafka.service.retransmit;

import com.google.common.collect.Range;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import kafka.api.OffsetRequest;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.common.TopicAndPartition;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.consumer.SimpleConsumer;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.common.broker.BrokerStorage;
import pl.allegro.tech.hermes.common.kafka.KafkaNamesMapper;
import pl.allegro.tech.hermes.common.kafka.KafkaTopic;
import pl.allegro.tech.hermes.common.kafka.KafkaTopicName;
import pl.allegro.tech.hermes.common.kafka.SimpleConsumerPool;
import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffset;
import pl.allegro.tech.hermes.common.kafka.offset.SubscriptionOffsetChangeIndicator;
import pl.allegro.tech.hermes.common.message.wrapper.MessageContentWrapper;
import pl.allegro.tech.hermes.domain.topic.schema.SchemaRepository;
import pl.allegro.tech.hermes.management.domain.message.RetransmissionService;
import pl.allegro.tech.hermes.management.infrastructure.kafka.service.KafkaRawMessageReader;

/* loaded from: input_file:pl/allegro/tech/hermes/management/infrastructure/kafka/service/retransmit/KafkaRetransmissionService.class */
public class KafkaRetransmissionService implements RetransmissionService {
    private final BrokerStorage brokerStorage;
    private final KafkaRawMessageReader kafkaRawMessageReader;
    private final MessageContentWrapper messageContentWrapper;
    private final SubscriptionOffsetChangeIndicator subscriptionOffsetChange;
    private final SimpleConsumerPool simpleConsumerPool;
    private final KafkaNamesMapper kafkaNamesMapper;
    private final SchemaRepository schemaRepository;

    public KafkaRetransmissionService(BrokerStorage brokerStorage, KafkaRawMessageReader kafkaRawMessageReader, MessageContentWrapper messageContentWrapper, SubscriptionOffsetChangeIndicator subscriptionOffsetChangeIndicator, SimpleConsumerPool simpleConsumerPool, KafkaNamesMapper kafkaNamesMapper, SchemaRepository schemaRepository) {
        this.brokerStorage = brokerStorage;
        this.kafkaRawMessageReader = kafkaRawMessageReader;
        this.messageContentWrapper = messageContentWrapper;
        this.subscriptionOffsetChange = subscriptionOffsetChangeIndicator;
        this.simpleConsumerPool = simpleConsumerPool;
        this.kafkaNamesMapper = kafkaNamesMapper;
        this.schemaRepository = schemaRepository;
    }

    @Override // pl.allegro.tech.hermes.management.domain.message.RetransmissionService
    public List<PartitionOffset> indicateOffsetChange(Topic topic, String str, String str2, long j, boolean z) {
        ArrayList arrayList = new ArrayList();
        this.kafkaNamesMapper.toKafkaTopics(topic).forEach(kafkaTopic -> {
            for (Integer num : this.brokerStorage.readPartitionsIds(kafkaTopic.name().asString())) {
                PartitionOffset partitionOffset = new PartitionOffset(kafkaTopic.name(), getLastOffset(createSimpleConsumer(kafkaTopic.name(), num.intValue()), topic, kafkaTopic, num.intValue(), j), num.intValue());
                arrayList.add(partitionOffset);
                if (!z) {
                    this.subscriptionOffsetChange.setSubscriptionOffset(topic.getName(), str, str2, partitionOffset);
                }
            }
        });
        return arrayList;
    }

    private SimpleConsumer createSimpleConsumer(KafkaTopicName kafkaTopicName, int i) {
        return this.simpleConsumerPool.get(Integer.valueOf(this.brokerStorage.readLeaderForPartition(new TopicAndPartition(kafkaTopicName.asString(), i))));
    }

    private long getLastOffset(SimpleConsumer simpleConsumer, Topic topic, KafkaTopic kafkaTopic, int i, long j) {
        return search(topic, kafkaTopic, i, getOffsetRange(simpleConsumer, kafkaTopic, i), j);
    }

    private long search(Topic topic, KafkaTopic kafkaTopic, int i, Range<Long> range, long j) {
        return new OffsetSearcher(new KafkaTimestampExtractor(topic, kafkaTopic, i, this.kafkaRawMessageReader, this.messageContentWrapper, this.schemaRepository)).search(range, j);
    }

    private Range<Long> getOffsetRange(SimpleConsumer simpleConsumer, KafkaTopic kafkaTopic, int i) {
        return Range.closed(Long.valueOf(getOffset(simpleConsumer, kafkaTopic, i, OffsetRequest.EarliestTime())), Long.valueOf(getOffset(simpleConsumer, kafkaTopic, i, OffsetRequest.LatestTime())));
    }

    private long getOffset(SimpleConsumer simpleConsumer, KafkaTopic kafkaTopic, int i, long j) {
        TopicAndPartition topicAndPartition = new TopicAndPartition(kafkaTopic.name().asString(), i);
        HashMap hashMap = new HashMap();
        hashMap.put(topicAndPartition, new PartitionOffsetRequestInfo(j, 1));
        return readOffsetFromResponse(simpleConsumer.getOffsetsBefore(new kafka.javaapi.OffsetRequest(hashMap, OffsetRequest.CurrentVersion(), "KafkaRetransmissionService" + kafkaTopic.name().asString())), topicAndPartition);
    }

    private long readOffsetFromResponse(OffsetResponse offsetResponse, TopicAndPartition topicAndPartition) {
        if (offsetResponse.hasError()) {
            throw new OffsetNotFoundException(offsetResponse.errorCode(topicAndPartition.topic(), topicAndPartition.partition()));
        }
        return offsetResponse.offsets(topicAndPartition.topic(), topicAndPartition.partition())[0];
    }
}
