package pl.allegro.tech.hermes.management.infrastructure.kafka.service;

import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.Iterator;
import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.message.MessageAndOffset;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.TopicName;
import pl.allegro.tech.hermes.common.kafka.SimpleConsumerPool;
import pl.allegro.tech.hermes.management.domain.topic.SingleMessageReader;
import pl.allegro.tech.hermes.management.domain.topic.SingleMessageReaderException;

/* loaded from: input_file:pl/allegro/tech/hermes/management/infrastructure/kafka/service/KafkaSingleMessageReader.class */
public class KafkaSingleMessageReader implements SingleMessageReader {
    private static final Logger logger = LoggerFactory.getLogger(KafkaSingleMessageReader.class);
    private final SimpleConsumerPool simpleConsumerPool;

    public KafkaSingleMessageReader(SimpleConsumerPool simpleConsumerPool) {
        this.simpleConsumerPool = simpleConsumerPool;
    }

    @Override // pl.allegro.tech.hermes.management.domain.topic.SingleMessageReader
    public String readMessage(TopicName topicName, int i, long j) {
        SimpleConsumer simpleConsumer = this.simpleConsumerPool.get(topicName.qualifiedName(), i);
        FetchResponse fetch = simpleConsumer.fetch(createFetchRequest(simpleConsumer.clientId(), topicName, i, j, this.simpleConsumerPool.getBufferSize()));
        String qualifiedName = topicName.qualifiedName();
        if (fetch.hasError()) {
            logger.error("Cannot read offset {} from topic/partition {}/{}. Error code: {}", new Object[]{Long.valueOf(j), topicName.qualifiedName(), Integer.valueOf(i), Short.valueOf(fetch.errorCode(qualifiedName, i))});
            throw new SingleMessageReaderException(String.format("Cannot read offset %d from topic/partition %s/%d. Error code: %d", Long.valueOf(j), topicName.qualifiedName(), Integer.valueOf(i), Short.valueOf(fetch.errorCode(qualifiedName, i))));
        }
        Iterator it = fetch.messageSet(qualifiedName, i).iterator();
        while (it.hasNext()) {
            MessageAndOffset messageAndOffset = (MessageAndOffset) it.next();
            long offset = messageAndOffset.offset();
            if (offset < j) {
                logger.info("Found an old offset: {} Expecting: {}", Long.valueOf(offset), Long.valueOf(j));
            } else {
                ByteBuffer payload = messageAndOffset.message().payload();
                byte[] bArr = new byte[payload.limit()];
                payload.get(bArr);
                if (offset == j) {
                    return new String(bArr, Charset.forName("UTF-8"));
                }
            }
        }
        logger.error("Cannot find message by offset {} from topic/partition {}/{}. Error code: {}", new Object[]{Long.valueOf(j), topicName.qualifiedName(), Integer.valueOf(i), Short.valueOf(fetch.errorCode(qualifiedName, i))});
        throw new SingleMessageReaderException(String.format("Cannot find message by offset %d from topic/partition %s/%d. Error code: %d", Long.valueOf(j), topicName.qualifiedName(), Integer.valueOf(i), Short.valueOf(fetch.errorCode(qualifiedName, i))));
    }

    private FetchRequest createFetchRequest(String str, TopicName topicName, int i, long j, int i2) {
        return new FetchRequestBuilder().clientId(str).addFetch(topicName.qualifiedName(), i, j, i2).build();
    }
}
