/*
 * Decompiled with CFR 0.152.
 */
package pl.allegro.tech.hermes.management.infrastructure.kafka.service.retransmit;

import com.google.common.collect.Range;
import java.util.HashMap;
import java.util.List;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.common.TopicAndPartition;
import kafka.javaapi.OffsetRequest;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.consumer.SimpleConsumer;
import pl.allegro.tech.hermes.api.TopicName;
import pl.allegro.tech.hermes.common.broker.BrokerStorage;
import pl.allegro.tech.hermes.common.json.MessageContentWrapper;
import pl.allegro.tech.hermes.common.kafka.SimpleConsumerPool;
import pl.allegro.tech.hermes.domain.subscription.offset.SubscriptionOffsetChangeIndicator;
import pl.allegro.tech.hermes.management.domain.message.RetransmissionService;
import pl.allegro.tech.hermes.management.infrastructure.kafka.service.KafkaSingleMessageReader;
import pl.allegro.tech.hermes.management.infrastructure.kafka.service.retransmit.KafkaTimestampExtractor;
import pl.allegro.tech.hermes.management.infrastructure.kafka.service.retransmit.OffsetNotFoundException;
import pl.allegro.tech.hermes.management.infrastructure.kafka.service.retransmit.OffsetSearcher;

public class KafkaRetransmissionService
implements RetransmissionService {
    private final BrokerStorage brokerStorage;
    private final KafkaSingleMessageReader kafkaSingleMessageReader;
    private final MessageContentWrapper messageContentWrapper;
    private final SubscriptionOffsetChangeIndicator subscriptionOffsetChange;
    private final SimpleConsumerPool simpleConsumerPool;

    public KafkaRetransmissionService(BrokerStorage brokerStorage, KafkaSingleMessageReader kafkaSingleMessageReader, MessageContentWrapper messageContentWrapper, SubscriptionOffsetChangeIndicator subscriptionOffsetChange, SimpleConsumerPool simpleConsumerPool) {
        this.brokerStorage = brokerStorage;
        this.kafkaSingleMessageReader = kafkaSingleMessageReader;
        this.messageContentWrapper = messageContentWrapper;
        this.subscriptionOffsetChange = subscriptionOffsetChange;
        this.simpleConsumerPool = simpleConsumerPool;
    }

    @Override
    public void indicateOffsetChange(TopicName topic, String subscription, String brokersClusterName, long timestamp) {
        List partitionsIds = this.brokerStorage.readPartitionsIds(topic.qualifiedName());
        for (Integer partitionId : partitionsIds) {
            SimpleConsumer consumer = this.createSimpleConsumer(topic, partitionId);
            long offset = this.getLastOffset(consumer, topic, partitionId, timestamp);
            this.subscriptionOffsetChange.setSubscriptionOffset(topic, subscription, brokersClusterName, partitionId.intValue(), Long.valueOf(offset));
        }
    }

    private SimpleConsumer createSimpleConsumer(TopicName topic, int partition) {
        Integer leader = this.brokerStorage.readLeaderForPartition(new TopicAndPartition(topic.qualifiedName(), partition));
        return this.simpleConsumerPool.get(leader);
    }

    private long getLastOffset(SimpleConsumer consumer, TopicName topic, int partition, long timestamp) {
        Range<Long> offsetRange = this.getOffsetRange(consumer, topic, partition);
        return this.search(topic, partition, offsetRange, timestamp);
    }

    private Long search(TopicName topic, int partition, Range<Long> offsetRange, long timestamp) {
        OffsetSearcher searcher = new OffsetSearcher(new KafkaTimestampExtractor(topic, partition, this.kafkaSingleMessageReader, this.messageContentWrapper));
        return searcher.search(offsetRange, timestamp);
    }

    private Range<Long> getOffsetRange(SimpleConsumer simpleConsumer, TopicName topic, int partition) {
        long earliestOffset = this.getOffset(simpleConsumer, topic, partition, kafka.api.OffsetRequest.EarliestTime());
        long latestOffset = this.getOffset(simpleConsumer, topic, partition, kafka.api.OffsetRequest.LatestTime());
        return Range.closed((Comparable)Long.valueOf(earliestOffset), (Comparable)Long.valueOf(latestOffset));
    }

    private long getOffset(SimpleConsumer simpleConsumer, TopicName topic, int partition, long whichTime) {
        TopicAndPartition topicAndPartition = new TopicAndPartition(topic.qualifiedName(), partition);
        HashMap<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
        requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
        OffsetRequest request = new OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), "KafkaRetransmissionService" + topic);
        OffsetResponse response = simpleConsumer.getOffsetsBefore(request);
        return this.readOffsetFromResponse(response, topicAndPartition);
    }

    private long readOffsetFromResponse(OffsetResponse response, TopicAndPartition topicAndPartition) {
        if (response.hasError()) {
            throw new OffsetNotFoundException(response.errorCode(topicAndPartition.topic(), topicAndPartition.partition()));
        }
        long[] offsets = response.offsets(topicAndPartition.topic(), topicAndPartition.partition());
        return offsets[0];
    }
}

