/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.kafka.consumer.support;

import java.util.Iterator;
import java.util.List;
import java.util.Set;
import org.apache.camel.Exchange;
import org.apache.camel.component.kafka.KafkaConsumer;
import org.apache.camel.component.kafka.consumer.CommitManager;
import org.apache.camel.component.kafka.consumer.errorhandler.KafkaConsumerListener;
import org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessor;
import org.apache.camel.component.kafka.consumer.support.ProcessingResult;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaRecordProcessorFacade {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaRecordProcessorFacade.class);
    private final KafkaConsumer camelKafkaConsumer;
    private final String threadId;
    private final KafkaRecordProcessor kafkaRecordProcessor;
    private final CommitManager commitManager;
    private final KafkaConsumerListener consumerListener;

    public KafkaRecordProcessorFacade(KafkaConsumer camelKafkaConsumer, String threadId, CommitManager commitManager, KafkaConsumerListener consumerListener) {
        this.camelKafkaConsumer = camelKafkaConsumer;
        this.threadId = threadId;
        this.commitManager = commitManager;
        this.kafkaRecordProcessor = this.buildKafkaRecordProcessor(commitManager);
        this.consumerListener = consumerListener;
    }

    private boolean isStopping() {
        return this.camelKafkaConsumer.isStopping();
    }

    public ProcessingResult processPolledRecords(ConsumerRecords<Object, Object> allRecords) {
        this.logRecords(allRecords);
        ProcessingResult result = ProcessingResult.newUnprocessed();
        Set partitions = allRecords.partitions();
        Iterator partitionIterator = partitions.iterator();
        LOG.debug("Poll received records on {} partitions", (Object)partitions.size());
        while (partitionIterator.hasNext() && !this.isStopping()) {
            TopicPartition partition = (TopicPartition)partitionIterator.next();
            LOG.debug("Processing records on partition {}", (Object)partition.partition());
            List partitionRecords = allRecords.records(partition);
            Iterator recordIterator = partitionRecords.iterator();
            this.logRecordsInPartition(partitionRecords, partition);
            while (!result.isBreakOnErrorHit() && recordIterator.hasNext() && !this.isStopping()) {
                ConsumerRecord record = (ConsumerRecord)recordIterator.next();
                LOG.debug("Processing record on partition {} with offset {}", (Object)record.partition(), (Object)record.offset());
                result = this.processRecord(partition, partitionIterator.hasNext(), recordIterator.hasNext(), this.kafkaRecordProcessor, (ConsumerRecord<Object, Object>)record);
                LOG.debug("Processed record on partition {} with offset {}", (Object)record.partition(), (Object)record.offset());
                if (this.consumerListener == null || this.consumerListener.afterProcess(result)) continue;
                this.commitManager.commit(partition);
                return result;
            }
            if (result.isBreakOnErrorHit()) continue;
            LOG.debug("Committing offset on successful execution");
            this.commitManager.commit(partition);
        }
        return result;
    }

    private void logRecordsInPartition(List<ConsumerRecord<Object, Object>> partitionRecords, TopicPartition partition) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Records count {} received for partition {}", (Object)partitionRecords.size(), (Object)partition);
        }
    }

    private void logRecords(ConsumerRecords<Object, Object> allRecords) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Last poll on thread {} resulted on {} records to process", (Object)this.threadId, (Object)allRecords.count());
        }
    }

    private ProcessingResult processRecord(TopicPartition partition, boolean partitionHasNext, boolean recordHasNext, KafkaRecordProcessor kafkaRecordProcessor, ConsumerRecord<Object, Object> record) {
        this.logRecord(record);
        Exchange exchange = this.camelKafkaConsumer.createExchange(false);
        ProcessingResult result = kafkaRecordProcessor.processExchange(exchange, partition, partitionHasNext, recordHasNext, record, this.camelKafkaConsumer.getExceptionHandler());
        this.camelKafkaConsumer.releaseExchange(exchange, false);
        return result;
    }

    private void logRecord(ConsumerRecord<Object, Object> record) {
        if (LOG.isTraceEnabled()) {
            LOG.trace("Partition = {}, offset = {}, key = {}, value = {}", new Object[]{record.partition(), record.offset(), record.key(), record.value()});
        }
    }

    private KafkaRecordProcessor buildKafkaRecordProcessor(CommitManager commitManager) {
        return new KafkaRecordProcessor(this.camelKafkaConsumer.getEndpoint().getConfiguration(), this.camelKafkaConsumer.getProcessor(), commitManager);
    }
}

