/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.kafka.consumer.support.batching;

import java.util.ArrayList;
import java.util.List;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.component.kafka.KafkaConfiguration;
import org.apache.camel.component.kafka.KafkaConsumer;
import org.apache.camel.component.kafka.consumer.CommitManager;
import org.apache.camel.component.kafka.consumer.KafkaManualCommit;
import org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessor;
import org.apache.camel.component.kafka.consumer.support.ProcessingResult;
import org.apache.camel.spi.ExceptionHandler;
import org.apache.camel.spi.Synchronization;
import org.apache.camel.util.StopWatch;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class KafkaRecordBatchingProcessor
extends KafkaRecordProcessor {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaRecordBatchingProcessor.class);
    private final KafkaConfiguration configuration;
    private final Processor processor;
    private final CommitManager commitManager;
    private final StopWatch watch = new StopWatch();
    private List<Exchange> exchangeList;

    public KafkaRecordBatchingProcessor(KafkaConfiguration configuration, Processor processor, CommitManager commitManager) {
        this.configuration = configuration;
        this.processor = processor;
        this.commitManager = commitManager;
    }

    public Exchange toExchange(KafkaConsumer camelKafkaConsumer, TopicPartition topicPartition, ConsumerRecord<Object, Object> consumerRecord) {
        Exchange exchange = camelKafkaConsumer.createExchange(false);
        Message message = exchange.getMessage();
        this.setupExchangeMessage(message, consumerRecord);
        this.propagateHeaders(this.configuration, consumerRecord, exchange);
        if (this.configuration.isAllowManualCommit()) {
            KafkaManualCommit manual = this.commitManager.getManualCommit(exchange, topicPartition, consumerRecord);
            message.setHeader("CamelKafkaManualCommit", (Object)manual);
        }
        return exchange;
    }

    public ProcessingResult processExchange(KafkaConsumer camelKafkaConsumer, ConsumerRecords<Object, Object> consumerRecords) {
        LOG.debug("There's {} records to process ... max poll is set to {}", (Object)consumerRecords.count(), (Object)this.configuration.getMaxPollRecords());
        if (this.exchangeList == null) {
            this.exchangeList = new ArrayList<Exchange>(this.configuration.getMaxPollRecords());
            this.watch.takenAndRestart();
        }
        if (this.hasExpiredRecords(consumerRecords)) {
            LOG.debug("The polling timeout has expired with {} records in cache. Dispatching the incomplete batch for processing", (Object)this.exchangeList.size());
            this.processBatch(camelKafkaConsumer);
            this.exchangeList = null;
            return ProcessingResult.newUnprocessed();
        }
        for (ConsumerRecord consumerRecord : consumerRecords) {
            TopicPartition tp = new TopicPartition(consumerRecord.topic(), consumerRecord.partition());
            Exchange childExchange = this.toExchange(camelKafkaConsumer, tp, (ConsumerRecord<Object, Object>)consumerRecord);
            this.exchangeList.add(childExchange);
            if (this.exchangeList.size() != this.configuration.getMaxPollRecords().intValue()) continue;
            this.processBatch(camelKafkaConsumer);
            this.exchangeList = null;
        }
        return ProcessingResult.newUnprocessed();
    }

    private boolean hasExpiredRecords(ConsumerRecords<Object, Object> consumerRecords) {
        return !this.exchangeList.isEmpty() && consumerRecords.isEmpty() && this.watch.taken() >= this.configuration.getPollTimeoutMs();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private ProcessingResult processBatch(KafkaConsumer camelKafkaConsumer) {
        Exchange exchange = camelKafkaConsumer.createExchange(false);
        Message message = exchange.getMessage();
        message.setBody(this.exchangeList);
        try {
            if (this.configuration.isAllowManualCommit()) {
                ProcessingResult processingResult = this.manualCommitResultProcessing(camelKafkaConsumer, exchange);
                return processingResult;
            }
            ProcessingResult processingResult = this.autoCommitResultProcessing(camelKafkaConsumer, exchange);
            return processingResult;
        }
        finally {
            camelKafkaConsumer.releaseExchange(exchange, false);
        }
    }

    private ProcessingResult autoCommitResultProcessing(KafkaConsumer camelKafkaConsumer, Exchange exchange) {
        ExceptionHandler exceptionHandler = camelKafkaConsumer.getExceptionHandler();
        CommitSynchronization commitSynchronization = new CommitSynchronization(exceptionHandler);
        exchange.getExchangeExtension().addOnCompletion((Synchronization)commitSynchronization);
        try {
            this.processor.process(exchange);
        }
        catch (Exception e) {
            exchange.setException((Throwable)e);
        }
        return commitSynchronization.result;
    }

    private ProcessingResult manualCommitResultProcessing(KafkaConsumer camelKafkaConsumer, Exchange exchange) {
        ProcessingResult result;
        try {
            this.processor.process(exchange);
        }
        catch (Exception e) {
            exchange.setException((Throwable)e);
        }
        if (exchange.getException() != null) {
            LOG.debug("An exception was thrown for batch records");
            ExceptionHandler exceptionHandler = camelKafkaConsumer.getExceptionHandler();
            boolean handled = this.processException(exchange, exceptionHandler);
            result = new ProcessingResult(false, handled);
        } else {
            result = new ProcessingResult(false, false);
        }
        return result;
    }

    private boolean processException(Exchange exchange, ExceptionHandler exceptionHandler) {
        exceptionHandler.handleException("Error during processing", exchange, (Throwable)exchange.getException());
        return true;
    }

    private final class CommitSynchronization
    implements Synchronization {
        private final ExceptionHandler exceptionHandler;
        private ProcessingResult result;

        public CommitSynchronization(ExceptionHandler exceptionHandler) {
            this.exceptionHandler = exceptionHandler;
        }

        public void onComplete(Exchange exchange) {
            List exchanges = (List)exchange.getMessage().getBody(List.class);
            if (exchanges == null || exchanges.isEmpty()) {
                LOG.warn("The exchange is {}", (Object)(exchanges == null ? "not of the expected type (null)" : "empty"));
                return;
            }
            LOG.debug("Calling commit on {} exchanges using {}", (Object)exchanges.size(), (Object)KafkaRecordBatchingProcessor.this.commitManager.getClass().getSimpleName());
            KafkaRecordBatchingProcessor.this.commitManager.commit();
            this.result = new ProcessingResult(false, false);
        }

        public void onFailure(Exchange exchange) {
            Exception cause = exchange.getException();
            if (cause != null) {
                this.exceptionHandler.handleException("Error during processing exchange. Will attempt to process the message on next poll.", exchange, (Throwable)cause);
            } else {
                LOG.warn("Skipping auto-commit on the batch because processing the exchanged has failed and the error was not correctly handled");
            }
            this.result = new ProcessingResult(false, true);
        }
    }
}

