package pl.allegro.tech.hermes.management.infrastructure.kafka.service;

import java.nio.ByteBuffer;
import java.util.Iterator;
import kafka.api.FetchRequestBuilder;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.message.MessageAndOffset;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.common.kafka.KafkaTopic;
import pl.allegro.tech.hermes.common.kafka.SimpleConsumerPool;
import pl.allegro.tech.hermes.management.domain.topic.SingleMessageReaderException;

/* loaded from: input_file:pl/allegro/tech/hermes/management/infrastructure/kafka/service/KafkaRawMessageReader.class */
public class KafkaRawMessageReader {
    private static final Logger logger = LoggerFactory.getLogger(KafkaRawMessageReader.class);
    private final SimpleConsumerPool simpleConsumerPool;

    public KafkaRawMessageReader(SimpleConsumerPool simpleConsumerPool) {
        this.simpleConsumerPool = simpleConsumerPool;
    }

    public byte[] readMessage(KafkaTopic kafkaTopic, int i, long j) {
        FetchResponse fetch = fetch(kafkaTopic, i, j);
        Iterator it = fetch.messageSet(kafkaTopic.name().asString(), i).iterator();
        while (it.hasNext()) {
            MessageAndOffset messageAndOffset = (MessageAndOffset) it.next();
            if (messageAndOffset.offset() == j) {
                return readPayloadAsBytes(messageAndOffset);
            }
            logger.info("Found an old offset: {} Expecting: {}", Long.valueOf(messageAndOffset.offset()), Long.valueOf(j));
        }
        throw messageReaderException(kafkaTopic, i, j, fetch, "Cannot find message");
    }

    private byte[] readPayloadAsBytes(MessageAndOffset messageAndOffset) {
        ByteBuffer payload = messageAndOffset.message().payload();
        byte[] bArr = new byte[payload.limit()];
        payload.get(bArr);
        return bArr;
    }

    private FetchResponse fetch(KafkaTopic kafkaTopic, int i, long j) {
        SimpleConsumer simpleConsumer = this.simpleConsumerPool.get(kafkaTopic, i);
        FetchResponse fetch = simpleConsumer.fetch(new FetchRequestBuilder().clientId(simpleConsumer.clientId()).addFetch(kafkaTopic.name().asString(), i, j, this.simpleConsumerPool.getBufferSize()).build());
        if (fetch.hasError()) {
            throw messageReaderException(kafkaTopic, i, j, fetch, "Cannot read offset");
        }
        return fetch;
    }

    private SingleMessageReaderException messageReaderException(KafkaTopic kafkaTopic, int i, long j, FetchResponse fetchResponse, String str) {
        String str2 = str + String.format("[offset %d, kafka_topic %s, partition %d, kafka_response_code: %d]", Long.valueOf(j), kafkaTopic.name().asString(), Integer.valueOf(i), Short.valueOf(fetchResponse.errorCode(kafkaTopic.name().asString(), i)));
        logger.error(str2);
        return new SingleMessageReaderException(str2);
    }
}
