package com.softwaremill.kmq;

import com.softwaremill.kmq.MarkerKey;
import com.softwaremill.kmq.MarkerValue;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.concurrent.Future;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.Deserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/softwaremill/kmq/KmqClient.class */
public class KmqClient<K, V> implements Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(KmqClient.class);
    private final KmqConfig config;
    private final long msgPollTimeout;
    private final KafkaConsumer<K, V> msgConsumer;
    private final KafkaProducer<MarkerKey, MarkerValue> markerProducer;

    public KmqClient(KmqConfig kmqConfig, KafkaClients kafkaClients, Class<? extends Deserializer<K>> cls, Class<? extends Deserializer<V>> cls2, long j) {
        this.config = kmqConfig;
        this.msgPollTimeout = j;
        this.msgConsumer = kafkaClients.createConsumer(kmqConfig.getMsgConsumerGroupId(), cls, cls2);
        this.markerProducer = kafkaClients.createProducer(MarkerKey.MarkerKeySerializer.class, MarkerValue.MarkerValueSerializer.class, Collections.singletonMap("partitioner.class", ParititionFromMarkerKey.class));
        LOG.info(String.format("Subscribing to topic: %s, using group id: %s", kmqConfig.getMsgTopic(), kmqConfig.getMsgConsumerGroupId()));
        this.msgConsumer.subscribe(Collections.singletonList(kmqConfig.getMsgTopic()));
    }

    public ConsumerRecords<K, V> nextBatch() {
        ArrayList arrayList = new ArrayList();
        ConsumerRecords<K, V> poll = this.msgConsumer.poll(this.msgPollTimeout);
        Iterator it = poll.iterator();
        while (it.hasNext()) {
            arrayList.add(this.markerProducer.send(new ProducerRecord(this.config.getMarkerTopic(), MarkerKey.fromRecord((ConsumerRecord) it.next()), new StartMarker(this.config.getMsgTimeoutMs()))));
        }
        arrayList.forEach(future -> {
            try {
                future.get();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
        this.msgConsumer.commitSync();
        return poll;
    }

    public Future<RecordMetadata> processed(ConsumerRecord<K, V> consumerRecord) {
        return this.markerProducer.send(new ProducerRecord(this.config.getMarkerTopic(), MarkerKey.fromRecord(consumerRecord), EndMarker.INSTANCE));
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.msgConsumer.close();
        this.markerProducer.close();
    }
}
