/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.kafka.consumer.support.streaming;

import java.util.Iterator;
import java.util.List;
import java.util.Set;
import org.apache.camel.component.kafka.KafkaConsumer;
import org.apache.camel.component.kafka.consumer.CommitManager;
import org.apache.camel.component.kafka.consumer.errorhandler.KafkaConsumerListener;
import org.apache.camel.component.kafka.consumer.support.AbstractKafkaRecordProcessorFacade;
import org.apache.camel.component.kafka.consumer.support.ProcessingResult;
import org.apache.camel.component.kafka.consumer.support.streaming.KafkaRecordStreamingProcessor;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaRecordStreamingProcessorFacade
extends AbstractKafkaRecordProcessorFacade {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaRecordStreamingProcessorFacade.class);
    private final KafkaRecordStreamingProcessor kafkaRecordProcessor;

    public KafkaRecordStreamingProcessorFacade(KafkaConsumer camelKafkaConsumer, String threadId, CommitManager commitManager, KafkaConsumerListener consumerListener) {
        super(camelKafkaConsumer, threadId, commitManager, consumerListener);
        this.kafkaRecordProcessor = this.buildKafkaRecordProcessor(commitManager);
    }

    private KafkaRecordStreamingProcessor buildKafkaRecordProcessor(CommitManager commitManager) {
        return new KafkaRecordStreamingProcessor(this.camelKafkaConsumer.getEndpoint().getConfiguration(), this.camelKafkaConsumer.getProcessor(), commitManager);
    }

    private ProcessingResult processRecord(TopicPartition partition, boolean partitionHasNext, boolean recordHasNext, KafkaRecordStreamingProcessor kafkaRecordProcessor, ConsumerRecord<Object, Object> consumerRecord) {
        this.logRecord(consumerRecord);
        return kafkaRecordProcessor.processExchange(this.camelKafkaConsumer, partition, partitionHasNext, recordHasNext, consumerRecord);
    }

    @Override
    public ProcessingResult processPolledRecords(ConsumerRecords<Object, Object> allRecords) {
        this.logRecords(allRecords);
        ProcessingResult result = ProcessingResult.newUnprocessed();
        Set partitions = allRecords.partitions();
        Iterator partitionIterator = partitions.iterator();
        LOG.debug("Poll received records on {} partitions", (Object)partitions.size());
        while (partitionIterator.hasNext() && !this.isStopping()) {
            TopicPartition partition = (TopicPartition)partitionIterator.next();
            LOG.debug("Processing records on partition {}", (Object)partition.partition());
            List partitionRecords = allRecords.records(partition);
            Iterator recordIterator = partitionRecords.iterator();
            this.logRecordsInPartition(partitionRecords, partition);
            while (!result.isBreakOnErrorHit() && recordIterator.hasNext() && !this.isStopping()) {
                ConsumerRecord consumerRecord = (ConsumerRecord)recordIterator.next();
                LOG.debug("Processing record on partition {} with offset {}", (Object)consumerRecord.partition(), (Object)consumerRecord.offset());
                result = this.processRecord(partition, partitionIterator.hasNext(), recordIterator.hasNext(), this.kafkaRecordProcessor, (ConsumerRecord<Object, Object>)consumerRecord);
                LOG.debug("Processed record on partition {} with offset {}", (Object)consumerRecord.partition(), (Object)consumerRecord.offset());
                if (this.consumerListener == null || this.consumerListener.afterProcess(result)) continue;
                this.commitManager.commit(partition);
                return result;
            }
            if (result.isBreakOnErrorHit()) continue;
            LOG.debug("Committing offset on successful execution");
            this.commitManager.commit(partition);
        }
        return result;
    }
}

