package pl.allegro.tech.hermes.management.infrastructure.kafka.service.retransmit;

import com.google.common.collect.Range;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import kafka.api.OffsetRequest;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.common.TopicAndPartition;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.consumer.SimpleConsumer;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.api.TopicName;
import pl.allegro.tech.hermes.common.broker.BrokerStorage;
import pl.allegro.tech.hermes.common.kafka.SimpleConsumerPool;
import pl.allegro.tech.hermes.common.message.wrapper.JsonMessageContentWrapper;
import pl.allegro.tech.hermes.domain.subscription.offset.PartitionOffset;
import pl.allegro.tech.hermes.domain.subscription.offset.SubscriptionOffsetChangeIndicator;
import pl.allegro.tech.hermes.domain.topic.TopicRepository;
import pl.allegro.tech.hermes.management.domain.message.RetransmissionService;
import pl.allegro.tech.hermes.management.domain.topic.SingleMessageReader;

/* loaded from: input_file:pl/allegro/tech/hermes/management/infrastructure/kafka/service/retransmit/KafkaRetransmissionService.class */
public class KafkaRetransmissionService implements RetransmissionService {
    private final BrokerStorage brokerStorage;
    private final SingleMessageReader singleMessageReader;
    private final JsonMessageContentWrapper messageContentWrapper;
    private final SubscriptionOffsetChangeIndicator subscriptionOffsetChange;
    private final SimpleConsumerPool simpleConsumerPool;
    private final TopicRepository topicRepository;

    public KafkaRetransmissionService(BrokerStorage brokerStorage, SingleMessageReader singleMessageReader, JsonMessageContentWrapper jsonMessageContentWrapper, SubscriptionOffsetChangeIndicator subscriptionOffsetChangeIndicator, SimpleConsumerPool simpleConsumerPool, TopicRepository topicRepository) {
        this.brokerStorage = brokerStorage;
        this.singleMessageReader = singleMessageReader;
        this.messageContentWrapper = jsonMessageContentWrapper;
        this.subscriptionOffsetChange = subscriptionOffsetChangeIndicator;
        this.simpleConsumerPool = simpleConsumerPool;
        this.topicRepository = topicRepository;
    }

    @Override // pl.allegro.tech.hermes.management.domain.message.RetransmissionService
    public List<PartitionOffset> indicateOffsetChange(TopicName topicName, String str, String str2, long j, boolean z) {
        ArrayList arrayList = new ArrayList();
        for (Integer num : this.brokerStorage.readPartitionsIds(topicName.qualifiedName())) {
            long lastOffset = getLastOffset(createSimpleConsumer(topicName, num.intValue()), this.topicRepository.getTopicDetails(topicName), num.intValue(), j);
            arrayList.add(new PartitionOffset(lastOffset, num.intValue()));
            if (!z) {
                this.subscriptionOffsetChange.setSubscriptionOffset(topicName, str, str2, num.intValue(), Long.valueOf(lastOffset));
            }
        }
        return arrayList;
    }

    private SimpleConsumer createSimpleConsumer(TopicName topicName, int i) {
        return this.simpleConsumerPool.get(Integer.valueOf(this.brokerStorage.readLeaderForPartition(new TopicAndPartition(topicName.qualifiedName(), i))));
    }

    private long getLastOffset(SimpleConsumer simpleConsumer, Topic topic, int i, long j) {
        return search(topic, i, getOffsetRange(simpleConsumer, topic, i), j);
    }

    private long search(Topic topic, int i, Range<Long> range, long j) {
        return new OffsetSearcher(new KafkaTimestampExtractor(topic, i, this.singleMessageReader, this.messageContentWrapper)).search(range, j);
    }

    private Range<Long> getOffsetRange(SimpleConsumer simpleConsumer, Topic topic, int i) {
        return Range.closed(Long.valueOf(getOffset(simpleConsumer, topic, i, OffsetRequest.EarliestTime())), Long.valueOf(getOffset(simpleConsumer, topic, i, OffsetRequest.LatestTime())));
    }

    private long getOffset(SimpleConsumer simpleConsumer, Topic topic, int i, long j) {
        TopicAndPartition topicAndPartition = new TopicAndPartition(topic.getQualifiedName(), i);
        HashMap hashMap = new HashMap();
        hashMap.put(topicAndPartition, new PartitionOffsetRequestInfo(j, 1));
        return readOffsetFromResponse(simpleConsumer.getOffsetsBefore(new kafka.javaapi.OffsetRequest(hashMap, OffsetRequest.CurrentVersion(), "KafkaRetransmissionService" + topic)), topicAndPartition);
    }

    private long readOffsetFromResponse(OffsetResponse offsetResponse, TopicAndPartition topicAndPartition) {
        if (offsetResponse.hasError()) {
            throw new OffsetNotFoundException(offsetResponse.errorCode(topicAndPartition.topic(), topicAndPartition.partition()));
        }
        return offsetResponse.offsets(topicAndPartition.topic(), topicAndPartition.partition())[0];
    }
}
