package pl.allegro.tech.hermes.management.infrastructure.kafka.service.retransmit;

import com.google.common.collect.Range;
import java.util.HashMap;
import kafka.api.OffsetRequest;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.common.TopicAndPartition;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.consumer.SimpleConsumer;
import pl.allegro.tech.hermes.api.TopicName;
import pl.allegro.tech.hermes.common.broker.BrokerStorage;
import pl.allegro.tech.hermes.common.json.MessageContentWrapper;
import pl.allegro.tech.hermes.common.kafka.SimpleConsumerPool;
import pl.allegro.tech.hermes.domain.subscription.offset.SubscriptionOffsetChangeIndicator;
import pl.allegro.tech.hermes.management.domain.message.RetransmissionService;
import pl.allegro.tech.hermes.management.infrastructure.kafka.service.KafkaSingleMessageReader;

/* loaded from: input_file:pl/allegro/tech/hermes/management/infrastructure/kafka/service/retransmit/KafkaRetransmissionService.class */
public class KafkaRetransmissionService implements RetransmissionService {
    private final BrokerStorage brokerStorage;
    private final KafkaSingleMessageReader kafkaSingleMessageReader;
    private final MessageContentWrapper messageContentWrapper;
    private final SubscriptionOffsetChangeIndicator subscriptionOffsetChange;
    private final SimpleConsumerPool simpleConsumerPool;

    public KafkaRetransmissionService(BrokerStorage brokerStorage, KafkaSingleMessageReader kafkaSingleMessageReader, MessageContentWrapper messageContentWrapper, SubscriptionOffsetChangeIndicator subscriptionOffsetChangeIndicator, SimpleConsumerPool simpleConsumerPool) {
        this.brokerStorage = brokerStorage;
        this.kafkaSingleMessageReader = kafkaSingleMessageReader;
        this.messageContentWrapper = messageContentWrapper;
        this.subscriptionOffsetChange = subscriptionOffsetChangeIndicator;
        this.simpleConsumerPool = simpleConsumerPool;
    }

    @Override // pl.allegro.tech.hermes.management.domain.message.RetransmissionService
    public void indicateOffsetChange(TopicName topicName, String str, String str2, long j) {
        for (Integer num : this.brokerStorage.readPartitionsIds(topicName.qualifiedName())) {
            this.subscriptionOffsetChange.setSubscriptionOffset(topicName, str, str2, num.intValue(), Long.valueOf(getLastOffset(createSimpleConsumer(topicName, num.intValue()), topicName, num.intValue(), j)));
        }
    }

    private SimpleConsumer createSimpleConsumer(TopicName topicName, int i) {
        return this.simpleConsumerPool.get(Integer.valueOf(this.brokerStorage.readLeaderForPartition(new TopicAndPartition(topicName.qualifiedName(), i))));
    }

    private long getLastOffset(SimpleConsumer simpleConsumer, TopicName topicName, int i, long j) {
        return search(topicName, i, getOffsetRange(simpleConsumer, topicName, i), j).longValue();
    }

    private Long search(TopicName topicName, int i, Range<Long> range, long j) {
        return Long.valueOf(new OffsetSearcher(new KafkaTimestampExtractor(topicName, i, this.kafkaSingleMessageReader, this.messageContentWrapper)).search(range, j));
    }

    private Range<Long> getOffsetRange(SimpleConsumer simpleConsumer, TopicName topicName, int i) {
        return Range.closed(Long.valueOf(getOffset(simpleConsumer, topicName, i, OffsetRequest.EarliestTime())), Long.valueOf(getOffset(simpleConsumer, topicName, i, OffsetRequest.LatestTime())));
    }

    private long getOffset(SimpleConsumer simpleConsumer, TopicName topicName, int i, long j) {
        TopicAndPartition topicAndPartition = new TopicAndPartition(topicName.qualifiedName(), i);
        HashMap hashMap = new HashMap();
        hashMap.put(topicAndPartition, new PartitionOffsetRequestInfo(j, 1));
        return readOffsetFromResponse(simpleConsumer.getOffsetsBefore(new kafka.javaapi.OffsetRequest(hashMap, OffsetRequest.CurrentVersion(), "KafkaRetransmissionService" + topicName)), topicAndPartition);
    }

    private long readOffsetFromResponse(OffsetResponse offsetResponse, TopicAndPartition topicAndPartition) {
        if (offsetResponse.hasError()) {
            throw new OffsetNotFoundException(offsetResponse.errorCode(topicAndPartition.topic(), topicAndPartition.partition()));
        }
        return offsetResponse.offsets(topicAndPartition.topic(), topicAndPartition.partition())[0];
    }
}
