/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.kafka.listener;

import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledFuture;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.springframework.context.ApplicationEvent;
import org.springframework.core.task.AsyncListenableTaskExecutor;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.kafka.KafkaException;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.KafkaResourceHolder;
import org.springframework.kafka.core.ProducerFactoryUtils;
import org.springframework.kafka.event.ListenerContainerIdleEvent;
import org.springframework.kafka.event.NonResponsiveConsumerEvent;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.kafka.listener.BatchErrorHandler;
import org.springframework.kafka.listener.BatchLoggingErrorHandler;
import org.springframework.kafka.listener.BatchMessageListener;
import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
import org.springframework.kafka.listener.ConsumerSeekAware;
import org.springframework.kafka.listener.DelegatingMessageListener;
import org.springframework.kafka.listener.ErrorHandler;
import org.springframework.kafka.listener.GenericErrorHandler;
import org.springframework.kafka.listener.GenericMessageListener;
import org.springframework.kafka.listener.ListenerType;
import org.springframework.kafka.listener.ListenerUtils;
import org.springframework.kafka.listener.LoggingErrorHandler;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.kafka.listener.RemainingRecordsErrorHandler;
import org.springframework.kafka.listener.config.ContainerProperties;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.TopicPartitionInitialOffset;
import org.springframework.kafka.transaction.KafkaTransactionManager;
import org.springframework.scheduling.SchedulingAwareRunnable;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.TransactionCallback;
import org.springframework.transaction.support.TransactionCallbackWithoutResult;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.transaction.support.TransactionTemplate;
import org.springframework.util.Assert;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

public class KafkaMessageListenerContainer<K, V>
extends AbstractMessageListenerContainer<K, V> {
    private final ConsumerFactory<K, V> consumerFactory;
    private final TopicPartitionInitialOffset[] topicPartitions;
    private volatile ListenerConsumer listenerConsumer;
    private volatile ListenableFuture<?> listenerConsumerFuture;
    private GenericMessageListener<?> listener;
    private String clientIdSuffix;

    public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory, ContainerProperties containerProperties) {
        this(consumerFactory, containerProperties, null);
    }

    public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory, ContainerProperties containerProperties, TopicPartitionInitialOffset ... topicPartitions) {
        super(containerProperties);
        Assert.notNull(consumerFactory, (String)"A ConsumerFactory must be provided");
        this.consumerFactory = consumerFactory;
        this.topicPartitions = topicPartitions != null ? Arrays.copyOf(topicPartitions, topicPartitions.length) : containerProperties.getTopicPartitions();
    }

    public void setClientIdSuffix(String clientIdSuffix) {
        this.clientIdSuffix = clientIdSuffix;
    }

    public Collection<TopicPartition> getAssignedPartitions() {
        ListenerConsumer listenerConsumer = this.listenerConsumer;
        if (listenerConsumer != null) {
            if (listenerConsumer.definedPartitions != null) {
                return Collections.unmodifiableCollection(listenerConsumer.definedPartitions.keySet());
            }
            if (listenerConsumer.assignedPartitions != null) {
                return Collections.unmodifiableCollection(listenerConsumer.assignedPartitions);
            }
            return null;
        }
        return null;
    }

    @Override
    public Map<String, Map<MetricName, ? extends Metric>> metrics() {
        Map metrics;
        Iterator metricIterator;
        ListenerConsumer listenerConsumer = this.listenerConsumer;
        if (listenerConsumer != null && (metricIterator = (metrics = listenerConsumer.consumer.metrics()).keySet().iterator()).hasNext()) {
            String clientId = (String)((MetricName)metricIterator.next()).tags().get("client-id");
            return Collections.singletonMap(clientId, metrics);
        }
        return Collections.emptyMap();
    }

    @Override
    protected void doStart() {
        Object messageListener;
        if (this.isRunning()) {
            return;
        }
        ContainerProperties containerProperties = this.getContainerProperties();
        if (!this.consumerFactory.isAutoCommit()) {
            AbstractMessageListenerContainer.AckMode ackMode = containerProperties.getAckMode();
            if (ackMode.equals((Object)AbstractMessageListenerContainer.AckMode.COUNT) || ackMode.equals((Object)AbstractMessageListenerContainer.AckMode.COUNT_TIME)) {
                Assert.state((containerProperties.getAckCount() > 0 ? 1 : 0) != 0, (String)"'ackCount' must be > 0");
            }
            if ((ackMode.equals((Object)AbstractMessageListenerContainer.AckMode.TIME) || ackMode.equals((Object)AbstractMessageListenerContainer.AckMode.COUNT_TIME)) && containerProperties.getAckTime() == 0L) {
                containerProperties.setAckTime(5000L);
            }
        }
        Assert.state(((messageListener = containerProperties.getMessageListener()) != null ? 1 : 0) != 0, (String)"A MessageListener is required");
        if (containerProperties.getConsumerTaskExecutor() == null) {
            SimpleAsyncTaskExecutor consumerExecutor = new SimpleAsyncTaskExecutor((this.getBeanName() == null ? "" : this.getBeanName()) + "-C-");
            containerProperties.setConsumerTaskExecutor((AsyncListenableTaskExecutor)consumerExecutor);
        }
        Assert.state((boolean)(messageListener instanceof GenericMessageListener), (String)"Listener must be a GenericListener");
        this.listener = (GenericMessageListener)messageListener;
        ListenerType listenerType = ListenerUtils.determineListenerType(this.listener);
        if (this.listener instanceof DelegatingMessageListener) {
            GenericMessageListener<Object> delegating = this.listener;
            while (delegating instanceof DelegatingMessageListener) {
                delegating = ((DelegatingMessageListener)((Object)delegating)).getDelegate();
            }
            listenerType = ListenerUtils.determineListenerType(delegating);
        }
        this.listenerConsumer = new ListenerConsumer(this.listener, listenerType);
        this.setRunning(true);
        this.listenerConsumerFuture = containerProperties.getConsumerTaskExecutor().submitListenable((Runnable)((Object)this.listenerConsumer));
    }

    @Override
    protected void doStop(final Runnable callback) {
        if (this.isRunning()) {
            this.listenerConsumerFuture.addCallback((ListenableFutureCallback)new ListenableFutureCallback<Object>(){

                public void onFailure(Throwable e) {
                    KafkaMessageListenerContainer.this.logger.error((Object)"Error while stopping the container: ", e);
                    if (callback != null) {
                        callback.run();
                    }
                }

                public void onSuccess(Object result) {
                    if (KafkaMessageListenerContainer.this.logger.isDebugEnabled()) {
                        KafkaMessageListenerContainer.this.logger.debug((Object)(KafkaMessageListenerContainer.this + " stopped normally"));
                    }
                    if (callback != null) {
                        callback.run();
                    }
                }
            });
            this.setRunning(false);
            this.listenerConsumer.consumer.wakeup();
        }
    }

    private void publishIdleContainerEvent(long idleTime, Consumer<?, ?> consumer) {
        if (this.getApplicationEventPublisher() != null) {
            this.getApplicationEventPublisher().publishEvent((ApplicationEvent)new ListenerContainerIdleEvent(this, idleTime, this.getBeanName(), this.getAssignedPartitions(), consumer));
        }
    }

    private void publishNonResponsiveConsumerEvent(long timeSinceLastPoll, Consumer<?, ?> consumer) {
        if (this.getApplicationEventPublisher() != null) {
            this.getApplicationEventPublisher().publishEvent((ApplicationEvent)new NonResponsiveConsumerEvent(this, timeSinceLastPoll, this.getBeanName(), this.getAssignedPartitions(), consumer));
        }
    }

    public String toString() {
        return "KafkaMessageListenerContainer [id=" + this.getBeanName() + (this.clientIdSuffix != null ? ", clientIndex=" + this.clientIdSuffix : "") + ", topicPartitions=" + (this.getAssignedPartitions() == null ? "none assigned" : this.getAssignedPartitions()) + "]";
    }

    private static final class OffsetMetadata {
        private final Long offset;
        private final boolean relativeToCurrent;

        OffsetMetadata(Long offset, boolean relativeToCurrent) {
            this.offset = offset;
            this.relativeToCurrent = relativeToCurrent;
        }
    }

    private static final class LoggingCommitCallback
    implements OffsetCommitCallback {
        private static final Log logger = LogFactory.getLog(LoggingCommitCallback.class);

        LoggingCommitCallback() {
        }

        public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
            if (exception != null) {
                logger.error((Object)("Commit failed for " + offsets), (Throwable)exception);
            } else if (logger.isDebugEnabled()) {
                logger.debug((Object)("Commits for " + offsets + " completed"));
            }
        }
    }

    private final class ListenerConsumer
    implements SchedulingAwareRunnable,
    ConsumerSeekAware.ConsumerSeekCallback {
        private final Log logger = LogFactory.getLog(ListenerConsumer.class);
        private final ContainerProperties containerProperties = KafkaMessageListenerContainer.this.getContainerProperties();
        private final OffsetCommitCallback commitCallback = this.containerProperties.getCommitCallback() != null ? this.containerProperties.getCommitCallback() : new LoggingCommitCallback();
        private final Consumer<K, V> consumer;
        private final Map<String, Map<Integer, Long>> offsets = new HashMap<String, Map<Integer, Long>>();
        private final GenericMessageListener<?> genericListener;
        private final MessageListener<K, V> listener;
        private final BatchMessageListener<K, V> batchListener;
        private final ListenerType listenerType;
        private final boolean isConsumerAwareListener;
        private final boolean isBatchListener;
        private final boolean autoCommit = KafkaMessageListenerContainer.access$300(KafkaMessageListenerContainer.this).isAutoCommit();
        private final boolean isManualAck = this.containerProperties.getAckMode().equals((Object)AbstractMessageListenerContainer.AckMode.MANUAL);
        private final boolean isManualImmediateAck = this.containerProperties.getAckMode().equals((Object)AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
        private final boolean isAnyManualAck = this.isManualAck || this.isManualImmediateAck;
        private final boolean isRecordAck = this.containerProperties.getAckMode().equals((Object)AbstractMessageListenerContainer.AckMode.RECORD);
        private final boolean isBatchAck = this.containerProperties.getAckMode().equals((Object)AbstractMessageListenerContainer.AckMode.BATCH);
        private final BlockingQueue<ConsumerRecord<K, V>> acks = new LinkedBlockingQueue();
        private final BlockingQueue<TopicPartitionInitialOffset> seeks = new LinkedBlockingQueue<TopicPartitionInitialOffset>();
        private final ErrorHandler errorHandler;
        private final BatchErrorHandler batchErrorHandler;
        private final PlatformTransactionManager transactionManager = this.containerProperties.getTransactionManager();
        private final KafkaTransactionManager kafkaTxManager = this.transactionManager instanceof KafkaTransactionManager ? (KafkaTransactionManager)this.transactionManager : null;
        private final TransactionTemplate transactionTemplate;
        private final String consumerGroupId = this.containerProperties.getGroupId() == null ? (String)KafkaMessageListenerContainer.access$300(KafkaMessageListenerContainer.this).getConfigurationProperties().get("group.id") : this.containerProperties.getGroupId();
        private final TaskScheduler taskScheduler;
        private final ScheduledFuture<?> monitorTask;
        private volatile Map<TopicPartition, OffsetMetadata> definedPartitions;
        private volatile Collection<TopicPartition> assignedPartitions;
        private volatile Thread consumerThread;
        private int count;
        private long last = System.currentTimeMillis();
        private boolean fatalError;
        private boolean taskSchedulerExplicitlySet;
        private volatile long lastPoll = System.currentTimeMillis();

        ListenerConsumer(GenericMessageListener<?> listener, ListenerType listenerType) {
            Assert.state((!this.isAnyManualAck || !this.autoCommit ? 1 : 0) != 0, (String)("Consumer cannot be configured for auto commit for ackMode " + (Object)((Object)this.containerProperties.getAckMode())));
            Consumer consumer = KafkaMessageListenerContainer.this.consumerFactory.createConsumer(this.consumerGroupId, KafkaMessageListenerContainer.this.clientIdSuffix);
            this.consumer = consumer;
            ConsumerRebalanceListener rebalanceListener = this.createRebalanceListener(consumer);
            if (KafkaMessageListenerContainer.this.topicPartitions == null) {
                if (this.containerProperties.getTopicPattern() != null) {
                    consumer.subscribe(this.containerProperties.getTopicPattern(), rebalanceListener);
                } else {
                    consumer.subscribe(Arrays.asList(this.containerProperties.getTopics()), rebalanceListener);
                }
            } else {
                List<TopicPartitionInitialOffset> topicPartitions = Arrays.asList(KafkaMessageListenerContainer.this.topicPartitions);
                this.definedPartitions = new HashMap<TopicPartition, OffsetMetadata>(topicPartitions.size());
                for (TopicPartitionInitialOffset topicPartition : topicPartitions) {
                    this.definedPartitions.put(topicPartition.topicPartition(), new OffsetMetadata(topicPartition.initialOffset(), topicPartition.isRelativeToCurrent()));
                }
                consumer.assign(new ArrayList<TopicPartition>(this.definedPartitions.keySet()));
            }
            GenericErrorHandler<?> errHandler = this.containerProperties.getGenericErrorHandler();
            this.genericListener = listener;
            if (listener instanceof BatchMessageListener) {
                this.listener = null;
                this.batchListener = (BatchMessageListener)listener;
                this.isBatchListener = true;
            } else if (listener instanceof MessageListener) {
                this.listener = (MessageListener)listener;
                this.batchListener = null;
                this.isBatchListener = false;
            } else {
                throw new IllegalArgumentException("Listener must be one of 'MessageListener', 'BatchMessageListener', or the variants that are consumer aware and/or Acknowledging not " + listener.getClass().getName());
            }
            this.listenerType = listenerType;
            boolean bl = this.isConsumerAwareListener = listenerType.equals((Object)ListenerType.ACKNOWLEDGING_CONSUMER_AWARE) || listenerType.equals((Object)ListenerType.CONSUMER_AWARE);
            if (this.isBatchListener) {
                this.validateErrorHandler(true);
                this.errorHandler = new LoggingErrorHandler();
                this.batchErrorHandler = this.determineBatchErrorHandler(errHandler);
            } else {
                this.validateErrorHandler(false);
                this.errorHandler = this.determineErrorHandler(errHandler);
                this.batchErrorHandler = new BatchLoggingErrorHandler();
            }
            Assert.state((!this.isBatchListener || !this.isRecordAck ? 1 : 0) != 0, (String)"Cannot use AckMode.RECORD with a batch listener");
            this.transactionTemplate = this.transactionManager != null ? new TransactionTemplate(this.transactionManager) : null;
            if (this.containerProperties.getScheduler() != null) {
                this.taskScheduler = this.containerProperties.getScheduler();
                this.taskSchedulerExplicitlySet = true;
            } else {
                ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler();
                threadPoolTaskScheduler.initialize();
                this.taskScheduler = threadPoolTaskScheduler;
            }
            this.monitorTask = this.taskScheduler.scheduleAtFixedRate(() -> this.checkConsumer(), (long)(this.containerProperties.getMonitorInterval() * 1000));
        }

        protected void checkConsumer() {
            long timeSinceLastPoll = System.currentTimeMillis() - this.lastPoll;
            if ((float)timeSinceLastPoll / (float)this.containerProperties.getPollTimeout() > this.containerProperties.getNoPollThreshold()) {
                KafkaMessageListenerContainer.this.publishNonResponsiveConsumerEvent(timeSinceLastPoll, this.consumer);
            }
        }

        protected BatchErrorHandler determineBatchErrorHandler(GenericErrorHandler<?> errHandler) {
            return errHandler != null ? (BatchErrorHandler)errHandler : (this.transactionManager != null ? null : new BatchLoggingErrorHandler());
        }

        protected ErrorHandler determineErrorHandler(GenericErrorHandler<?> errHandler) {
            return errHandler != null ? (ErrorHandler)errHandler : (this.transactionManager != null ? null : new LoggingErrorHandler());
        }

        public ConsumerRebalanceListener createRebalanceListener(final Consumer<K, V> consumer) {
            return new ConsumerRebalanceListener(){
                final ConsumerRebalanceListener userListener;
                final ConsumerAwareRebalanceListener consumerAwareListener;
                {
                    this.userListener = KafkaMessageListenerContainer.this.getContainerProperties().getConsumerRebalanceListener();
                    this.consumerAwareListener = this.userListener instanceof ConsumerAwareRebalanceListener ? (ConsumerAwareRebalanceListener)this.userListener : null;
                }

                public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                    if (this.consumerAwareListener != null) {
                        this.consumerAwareListener.onPartitionsRevokedBeforeCommit(consumer, partitions);
                    } else {
                        this.userListener.onPartitionsRevoked(partitions);
                    }
                    ListenerConsumer.this.commitPendingAcks();
                    if (this.consumerAwareListener != null) {
                        this.consumerAwareListener.onPartitionsRevokedAfterCommit(consumer, partitions);
                    }
                }

                public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                    ListenerConsumer.this.assignedPartitions = partitions;
                    if (!ListenerConsumer.this.autoCommit) {
                        final HashMap<TopicPartition, OffsetAndMetadata> offsets = new HashMap<TopicPartition, OffsetAndMetadata>();
                        for (TopicPartition partition : partitions) {
                            try {
                                offsets.put(partition, new OffsetAndMetadata(consumer.position(partition)));
                            }
                            catch (NoOffsetForPartitionException e) {
                                ListenerConsumer.this.fatalError = true;
                                ListenerConsumer.this.logger.error((Object)"No offset and no reset policy", (Throwable)e);
                                return;
                            }
                        }
                        if (ListenerConsumer.this.logger.isDebugEnabled()) {
                            ListenerConsumer.this.logger.debug((Object)("Committing on assignment: " + offsets));
                        }
                        if (ListenerConsumer.this.transactionTemplate != null && ListenerConsumer.this.kafkaTxManager != null) {
                            ListenerConsumer.this.transactionTemplate.execute((TransactionCallback)new TransactionCallbackWithoutResult(){

                                protected void doInTransactionWithoutResult(TransactionStatus status) {
                                    ((KafkaResourceHolder)((Object)TransactionSynchronizationManager.getResource(ListenerConsumer.this.kafkaTxManager.getProducerFactory()))).getProducer().sendOffsetsToTransaction(offsets, ListenerConsumer.this.consumerGroupId);
                                }
                            });
                        } else if (KafkaMessageListenerContainer.this.getContainerProperties().isSyncCommits()) {
                            ListenerConsumer.this.consumer.commitSync(offsets);
                        } else {
                            ListenerConsumer.this.consumer.commitAsync(offsets, KafkaMessageListenerContainer.this.getContainerProperties().getCommitCallback());
                        }
                    }
                    if (ListenerConsumer.this.genericListener instanceof ConsumerSeekAware) {
                        ListenerConsumer.this.seekPartitions(partitions, false);
                    }
                    if (this.consumerAwareListener != null) {
                        this.consumerAwareListener.onPartitionsAssigned(consumer, partitions);
                    } else {
                        this.userListener.onPartitionsAssigned(partitions);
                    }
                }
            };
        }

        private void seekPartitions(Collection<TopicPartition> partitions, boolean idle) {
            HashMap<TopicPartition, Long> current = new HashMap<TopicPartition, Long>();
            for (TopicPartition topicPartition : partitions) {
                current.put(topicPartition, this.consumer.position(topicPartition));
            }
            ConsumerSeekAware.ConsumerSeekCallback callback = new ConsumerSeekAware.ConsumerSeekCallback(){

                @Override
                public void seek(String topic, int partition, long offset) {
                    ListenerConsumer.this.consumer.seek(new TopicPartition(topic, partition), offset);
                }

                @Override
                public void seekToBeginning(String topic, int partition) {
                    ListenerConsumer.this.consumer.seekToBeginning(Collections.singletonList(new TopicPartition(topic, partition)));
                }

                @Override
                public void seekToEnd(String topic, int partition) {
                    ListenerConsumer.this.consumer.seekToEnd(Collections.singletonList(new TopicPartition(topic, partition)));
                }
            };
            if (idle) {
                ((ConsumerSeekAware)((Object)this.genericListener)).onIdleContainer(current, callback);
            } else {
                ((ConsumerSeekAware)((Object)this.genericListener)).onPartitionsAssigned(current, callback);
            }
        }

        private void validateErrorHandler(boolean batch) {
            GenericErrorHandler<?> errHandler = this.containerProperties.getGenericErrorHandler();
            if (this.errorHandler == null) {
                return;
            }
            Type[] genericInterfaces = errHandler.getClass().getGenericInterfaces();
            boolean ok = false;
            for (Type t : genericInterfaces) {
                if (t.equals(ErrorHandler.class)) {
                    ok = !batch;
                    break;
                }
                if (!t.equals(BatchErrorHandler.class)) continue;
                ok = batch;
                break;
            }
            Assert.state((boolean)ok, (String)("Error handler is not compatible with the message listener, expecting an instance of " + (batch ? "BatchErrorHandler" : "ErrorHandler") + " not " + errHandler.getClass().getName()));
        }

        public boolean isLongLived() {
            return true;
        }

        public void run() {
            long lastReceive;
            this.consumerThread = Thread.currentThread();
            if (this.genericListener instanceof ConsumerSeekAware) {
                ((ConsumerSeekAware)((Object)this.genericListener)).registerSeekCallback(this);
            }
            if (this.transactionManager != null) {
                ProducerFactoryUtils.setConsumerGroupId(this.consumerGroupId);
            }
            this.count = 0;
            this.last = System.currentTimeMillis();
            if (KafkaMessageListenerContainer.this.isRunning() && this.definedPartitions != null) {
                this.initPartitionsIfNeeded();
            }
            long lastAlertAt = lastReceive = System.currentTimeMillis();
            while (KafkaMessageListenerContainer.this.isRunning()) {
                try {
                    long now;
                    if (!this.autoCommit && !this.isRecordAck) {
                        this.processCommits();
                    }
                    this.processSeeks();
                    ConsumerRecords records = this.consumer.poll(this.containerProperties.getPollTimeout());
                    this.lastPoll = System.currentTimeMillis();
                    if (records != null && this.logger.isDebugEnabled()) {
                        this.logger.debug((Object)("Received: " + records.count() + " records"));
                    }
                    if (records != null && records.count() > 0) {
                        if (this.containerProperties.getIdleEventInterval() != null) {
                            lastReceive = System.currentTimeMillis();
                        }
                        this.invokeListener(records);
                        continue;
                    }
                    if (this.containerProperties.getIdleEventInterval() == null || (now = System.currentTimeMillis()) <= lastReceive + this.containerProperties.getIdleEventInterval() || now <= lastAlertAt + this.containerProperties.getIdleEventInterval()) continue;
                    KafkaMessageListenerContainer.this.publishIdleContainerEvent(now - lastReceive, this.isConsumerAwareListener ? this.consumer : null);
                    lastAlertAt = now;
                    if (!(this.genericListener instanceof ConsumerSeekAware)) continue;
                    this.seekPartitions(KafkaMessageListenerContainer.this.getAssignedPartitions(), true);
                }
                catch (WakeupException records) {
                }
                catch (NoOffsetForPartitionException nofpe) {
                    this.fatalError = true;
                    this.logger.error((Object)"No offset and no reset policy", (Throwable)nofpe);
                    break;
                }
                catch (Exception e) {
                    if (this.containerProperties.getGenericErrorHandler() != null) {
                        this.containerProperties.getGenericErrorHandler().handle(e, null);
                        continue;
                    }
                    this.logger.error((Object)"Container exception", (Throwable)e);
                }
            }
            ProducerFactoryUtils.clearConsumerGroupId();
            if (!this.fatalError) {
                if (this.kafkaTxManager == null) {
                    this.commitPendingAcks();
                    try {
                        this.consumer.unsubscribe();
                    }
                    catch (WakeupException wakeupException) {}
                }
            } else {
                this.logger.error((Object)"No offset and no reset policy; stopping container");
                KafkaMessageListenerContainer.this.stop();
            }
            this.monitorTask.cancel(true);
            if (!this.taskSchedulerExplicitlySet) {
                ((ThreadPoolTaskScheduler)this.taskScheduler).destroy();
            }
            this.consumer.close();
            if (this.logger.isInfoEnabled()) {
                this.logger.info((Object)"Consumer stopped");
            }
        }

        private void commitPendingAcks() {
            this.processCommits();
            if (this.offsets.size() > 0) {
                this.commitIfNecessary();
            }
        }

        private void handleAcks() {
            ConsumerRecord record = (ConsumerRecord)this.acks.poll();
            while (record != null) {
                if (this.logger.isTraceEnabled()) {
                    this.logger.trace((Object)("Ack: " + record));
                }
                this.processAck(record);
                record = (ConsumerRecord)this.acks.poll();
            }
        }

        private void processAck(ConsumerRecord<K, V> record) {
            if (!Thread.currentThread().equals(this.consumerThread)) {
                try {
                    this.acks.put(record);
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new KafkaException("Interrupted while storing ack", e);
                }
            } else if (this.isManualImmediateAck) {
                try {
                    this.ackImmediate(record);
                }
                catch (WakeupException wakeupException) {}
            } else {
                this.addOffset(record);
            }
        }

        private void ackImmediate(ConsumerRecord<K, V> record) {
            Map<TopicPartition, OffsetAndMetadata> commits = Collections.singletonMap(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1L));
            if (this.logger.isDebugEnabled()) {
                this.logger.debug((Object)("Committing: " + commits));
            }
            if (this.containerProperties.isSyncCommits()) {
                this.consumer.commitSync(commits);
            } else {
                this.consumer.commitAsync(commits, this.commitCallback);
            }
        }

        private void invokeListener(ConsumerRecords<K, V> records) {
            if (this.isBatchListener) {
                this.invokeBatchListener(records);
            } else {
                this.invokeRecordListener(records);
            }
        }

        private void invokeBatchListener(ConsumerRecords<K, V> records) {
            LinkedList recordList = new LinkedList();
            Iterator iterator = records.iterator();
            while (iterator.hasNext()) {
                recordList.add((ConsumerRecord)iterator.next());
            }
            if (recordList.size() > 0) {
                if (this.transactionTemplate != null) {
                    this.invokeBatchListenerInTx(records, recordList);
                } else {
                    this.doInvokeBatchListener(records, recordList, null);
                }
            }
        }

        private void invokeBatchListenerInTx(final ConsumerRecords<K, V> records, final List<ConsumerRecord<K, V>> recordList) {
            try {
                this.transactionTemplate.execute((TransactionCallback)new TransactionCallbackWithoutResult(){

                    public void doInTransactionWithoutResult(TransactionStatus s) {
                        RuntimeException aborted;
                        Producer producer = null;
                        if (ListenerConsumer.this.kafkaTxManager != null) {
                            producer = ((KafkaResourceHolder)((Object)TransactionSynchronizationManager.getResource(ListenerConsumer.this.kafkaTxManager.getProducerFactory()))).getProducer();
                        }
                        if ((aborted = ListenerConsumer.this.doInvokeBatchListener(records, recordList, producer)) != null) {
                            throw aborted;
                        }
                    }
                });
            }
            catch (RuntimeException e) {
                this.logger.error((Object)"Transaction rolled back", (Throwable)e);
                HashMap seekOffsets = new HashMap();
                records.forEach(r -> seekOffsets.computeIfAbsent(new TopicPartition(r.topic(), r.partition()), v -> r.offset()));
                seekOffsets.entrySet().forEach(entry -> this.consumer.seek((TopicPartition)entry.getKey(), ((Long)entry.getValue()).longValue()));
            }
        }

        private RuntimeException doInvokeBatchListener(ConsumerRecords<K, V> records, List<ConsumerRecord<K, V>> recordList, Producer producer) throws Error {
            try {
                switch (this.listenerType) {
                    case ACKNOWLEDGING_CONSUMER_AWARE: {
                        this.batchListener.onMessage(recordList, this.isAnyManualAck ? new ConsumerBatchAcknowledgment(recordList) : null, this.consumer);
                        break;
                    }
                    case ACKNOWLEDGING: {
                        this.batchListener.onMessage(recordList, this.isAnyManualAck ? new ConsumerBatchAcknowledgment(recordList) : null);
                        break;
                    }
                    case CONSUMER_AWARE: {
                        this.batchListener.onMessage(recordList, this.consumer);
                        break;
                    }
                    case SIMPLE: {
                        this.batchListener.onMessage(recordList);
                    }
                }
                if (!this.isAnyManualAck && !this.autoCommit) {
                    for (ConsumerRecord record : this.getHighestOffsetRecords(recordList)) {
                        this.acks.put(record);
                    }
                    if (producer != null) {
                        this.sendOffsetsToTransaction(producer);
                    }
                }
            }
            catch (RuntimeException e) {
                if (this.containerProperties.isAckOnError() && !this.autoCommit) {
                    for (ConsumerRecord record : this.getHighestOffsetRecords(recordList)) {
                        this.acks.add(record);
                    }
                }
                if (this.batchErrorHandler == null) {
                    throw e;
                }
                try {
                    this.batchErrorHandler.handle(e, records, this.consumer);
                    if (producer != null) {
                        this.sendOffsetsToTransaction(producer);
                    }
                }
                catch (RuntimeException ee) {
                    this.logger.error((Object)"Error handler threw an exception", (Throwable)ee);
                    return ee;
                }
                catch (Error er) {
                    this.logger.error((Object)"Error handler threw an error", (Throwable)er);
                    throw er;
                }
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            return null;
        }

        private void invokeRecordListener(ConsumerRecords<K, V> records) {
            if (this.transactionTemplate != null) {
                this.innvokeRecordListenerInTx(records);
            } else {
                this.doInvokeWithRecords(records);
            }
        }

        private void innvokeRecordListenerInTx(ConsumerRecords<K, V> records) {
            final Iterator iterator = records.iterator();
            while (iterator.hasNext()) {
                final ConsumerRecord record = (ConsumerRecord)iterator.next();
                if (this.logger.isTraceEnabled()) {
                    this.logger.trace((Object)("Processing " + record));
                }
                try {
                    this.transactionTemplate.execute((TransactionCallback)new TransactionCallbackWithoutResult(){

                        public void doInTransactionWithoutResult(TransactionStatus s) {
                            RuntimeException aborted;
                            Producer producer = null;
                            if (ListenerConsumer.this.kafkaTxManager != null) {
                                producer = ((KafkaResourceHolder)((Object)TransactionSynchronizationManager.getResource(ListenerConsumer.this.kafkaTxManager.getProducerFactory()))).getProducer();
                            }
                            if ((aborted = ListenerConsumer.this.doInvokeRecordListener(record, producer, iterator)) != null) {
                                throw aborted;
                            }
                        }
                    });
                }
                catch (RuntimeException e) {
                    this.logger.error((Object)"Transaction rolled back", (Throwable)e);
                    this.consumer.seek(new TopicPartition(record.topic(), record.partition()), record.offset());
                    break;
                }
            }
        }

        private void doInvokeWithRecords(ConsumerRecords<K, V> records) throws Error {
            Iterator iterator = records.iterator();
            while (iterator.hasNext()) {
                ConsumerRecord record = (ConsumerRecord)iterator.next();
                if (this.logger.isTraceEnabled()) {
                    this.logger.trace((Object)("Processing " + record));
                }
                this.doInvokeRecordListener(record, null, iterator);
            }
        }

        private RuntimeException doInvokeRecordListener(ConsumerRecord<K, V> record, Producer producer, Iterator<ConsumerRecord<K, V>> iterator) throws Error {
            try {
                switch (this.listenerType) {
                    case ACKNOWLEDGING_CONSUMER_AWARE: {
                        this.listener.onMessage(record, this.isAnyManualAck ? new ConsumerAcknowledgment(record) : null, this.consumer);
                        break;
                    }
                    case CONSUMER_AWARE: {
                        this.listener.onMessage(record, this.consumer);
                        break;
                    }
                    case ACKNOWLEDGING: {
                        this.listener.onMessage(record, this.isAnyManualAck ? new ConsumerAcknowledgment(record) : null);
                        break;
                    }
                    case SIMPLE: {
                        this.listener.onMessage(record);
                    }
                }
                if (this.isRecordAck) {
                    Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = Collections.singletonMap(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1L));
                    if (producer == null) {
                        if (this.containerProperties.isSyncCommits()) {
                            this.consumer.commitSync(offsetsToCommit);
                        } else {
                            this.consumer.commitAsync(offsetsToCommit, this.commitCallback);
                        }
                    } else {
                        this.acks.add(record);
                    }
                } else if (!this.isAnyManualAck && !this.autoCommit) {
                    this.acks.add(record);
                }
                if (producer != null) {
                    this.sendOffsetsToTransaction(producer);
                }
            }
            catch (RuntimeException e) {
                if (this.containerProperties.isAckOnError() && !this.autoCommit && producer == null) {
                    if (this.isRecordAck) {
                        Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = Collections.singletonMap(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1L));
                        if (this.containerProperties.isSyncCommits()) {
                            this.consumer.commitSync(offsetsToCommit);
                        } else {
                            this.consumer.commitAsync(offsetsToCommit, this.commitCallback);
                        }
                    } else if (!this.isAnyManualAck) {
                        this.acks.add(record);
                    }
                }
                if (this.errorHandler == null) {
                    throw e;
                }
                try {
                    if (this.errorHandler instanceof RemainingRecordsErrorHandler) {
                        this.processCommits();
                        ArrayList records = new ArrayList();
                        records.add(record);
                        while (iterator.hasNext()) {
                            records.add(iterator.next());
                        }
                        ((RemainingRecordsErrorHandler)this.errorHandler).handle((Exception)e, records, (Consumer<?, ?>)this.consumer);
                    } else {
                        this.errorHandler.handle(e, record, this.consumer);
                        if (producer != null) {
                            try {
                                this.sendOffsetsToTransaction(producer);
                            }
                            catch (Exception e1) {
                                this.logger.error((Object)"Send offsets to transaction failed", (Throwable)e1);
                            }
                        }
                    }
                }
                catch (RuntimeException ee) {
                    this.logger.error((Object)"Error handler threw an exception", (Throwable)ee);
                    return ee;
                }
                catch (Error er) {
                    this.logger.error((Object)"Error handler threw an error", (Throwable)er);
                    throw er;
                }
            }
            return null;
        }

        private void sendOffsetsToTransaction(Producer producer) {
            this.handleAcks();
            Map<TopicPartition, OffsetAndMetadata> commits = this.buildCommits();
            producer.sendOffsetsToTransaction(commits, this.consumerGroupId);
        }

        private void processCommits() {
            this.count += this.acks.size();
            this.handleAcks();
            AbstractMessageListenerContainer.AckMode ackMode = this.containerProperties.getAckMode();
            if (!this.isManualImmediateAck) {
                boolean countExceeded;
                if (!this.isManualAck) {
                    this.updatePendingOffsets();
                }
                boolean bl = countExceeded = this.count >= this.containerProperties.getAckCount();
                if (this.isManualAck || this.isBatchAck || this.isRecordAck || ackMode.equals((Object)AbstractMessageListenerContainer.AckMode.COUNT) && countExceeded) {
                    if (this.logger.isDebugEnabled() && ackMode.equals((Object)AbstractMessageListenerContainer.AckMode.COUNT)) {
                        this.logger.debug((Object)("Committing in AckMode.COUNT because count " + this.count + " exceeds configured limit of " + this.containerProperties.getAckCount()));
                    }
                    this.commitIfNecessary();
                    this.count = 0;
                } else {
                    boolean elapsed;
                    long now = System.currentTimeMillis();
                    boolean bl2 = elapsed = now - this.last > this.containerProperties.getAckTime();
                    if (ackMode.equals((Object)AbstractMessageListenerContainer.AckMode.TIME) && elapsed) {
                        if (this.logger.isDebugEnabled()) {
                            this.logger.debug((Object)("Committing in AckMode.TIME because time elapsed exceeds configured limit of " + this.containerProperties.getAckTime()));
                        }
                        this.commitIfNecessary();
                        this.last = now;
                    } else if (ackMode.equals((Object)AbstractMessageListenerContainer.AckMode.COUNT_TIME) && (elapsed || countExceeded)) {
                        if (this.logger.isDebugEnabled()) {
                            if (elapsed) {
                                this.logger.debug((Object)("Committing in AckMode.COUNT_TIME because time elapsed exceeds configured limit of " + this.containerProperties.getAckTime()));
                            } else {
                                this.logger.debug((Object)("Committing in AckMode.COUNT_TIME because count " + this.count + " exceeds configured limit of" + this.containerProperties.getAckCount()));
                            }
                        }
                        this.commitIfNecessary();
                        this.last = now;
                        this.count = 0;
                    }
                }
            }
        }

        private void processSeeks() {
            TopicPartitionInitialOffset offset = (TopicPartitionInitialOffset)this.seeks.poll();
            while (offset != null) {
                if (this.logger.isTraceEnabled()) {
                    this.logger.trace((Object)("Seek: " + offset));
                }
                try {
                    TopicPartitionInitialOffset.SeekPosition position = offset.getPosition();
                    if (position == null) {
                        this.consumer.seek(offset.topicPartition(), offset.initialOffset().longValue());
                    } else if (position.equals((Object)TopicPartitionInitialOffset.SeekPosition.BEGINNING)) {
                        this.consumer.seekToBeginning(Collections.singletonList(offset.topicPartition()));
                    } else {
                        this.consumer.seekToEnd(Collections.singletonList(offset.topicPartition()));
                    }
                }
                catch (Exception e) {
                    this.logger.error((Object)("Exception while seeking " + offset), (Throwable)e);
                }
                offset = (TopicPartitionInitialOffset)this.seeks.poll();
            }
        }

        private void initPartitionsIfNeeded() {
            for (Map.Entry<TopicPartition, OffsetMetadata> entry : this.definedPartitions.entrySet()) {
                TopicPartition topicPartition = entry.getKey();
                OffsetMetadata metadata = entry.getValue();
                Long offset = metadata.offset;
                if (offset == null) continue;
                long newOffset = offset;
                if (offset < 0L) {
                    if (!metadata.relativeToCurrent) {
                        this.consumer.seekToEnd(Arrays.asList(topicPartition));
                    }
                    newOffset = Math.max(0L, this.consumer.position(topicPartition) + offset);
                } else if (metadata.relativeToCurrent) {
                    newOffset = this.consumer.position(topicPartition) + offset;
                }
                try {
                    this.consumer.seek(topicPartition, newOffset);
                    if (!this.logger.isDebugEnabled()) continue;
                    this.logger.debug((Object)("Reset " + topicPartition + " to offset " + newOffset));
                }
                catch (Exception e) {
                    this.logger.error((Object)("Failed to set initial offset for " + topicPartition + " at " + newOffset + ". Position is " + this.consumer.position(topicPartition)), (Throwable)e);
                }
            }
        }

        private void updatePendingOffsets() {
            ConsumerRecord record = (ConsumerRecord)this.acks.poll();
            while (record != null) {
                this.addOffset(record);
                record = (ConsumerRecord)this.acks.poll();
            }
        }

        private void addOffset(ConsumerRecord<K, V> record) {
            this.offsets.computeIfAbsent(record.topic(), v -> new HashMap()).compute(record.partition(), (k, v) -> v == null ? record.offset() : Math.max(v, record.offset()));
        }

        private void commitIfNecessary() {
            block7: {
                Map<TopicPartition, OffsetAndMetadata> commits = this.buildCommits();
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug((Object)("Commit list: " + commits));
                }
                if (!commits.isEmpty()) {
                    if (this.logger.isDebugEnabled()) {
                        this.logger.debug((Object)("Committing: " + commits));
                    }
                    try {
                        if (this.containerProperties.isSyncCommits()) {
                            this.consumer.commitSync(commits);
                        } else {
                            this.consumer.commitAsync(commits, this.commitCallback);
                        }
                    }
                    catch (WakeupException e) {
                        if (!this.logger.isDebugEnabled()) break block7;
                        this.logger.debug((Object)"Woken up during commit");
                    }
                }
            }
        }

        private Map<TopicPartition, OffsetAndMetadata> buildCommits() {
            HashMap<TopicPartition, OffsetAndMetadata> commits = new HashMap<TopicPartition, OffsetAndMetadata>();
            for (Map.Entry<String, Map<Integer, Long>> entry : this.offsets.entrySet()) {
                for (Map.Entry<Integer, Long> offset : entry.getValue().entrySet()) {
                    commits.put(new TopicPartition(entry.getKey(), offset.getKey().intValue()), new OffsetAndMetadata(offset.getValue() + 1L));
                }
            }
            this.offsets.clear();
            return commits;
        }

        private Collection<ConsumerRecord<K, V>> getHighestOffsetRecords(List<ConsumerRecord<K, V>> records) {
            HashMap highestOffsetMap = new HashMap();
            records.forEach(r -> highestOffsetMap.compute(new TopicPartition(r.topic(), r.partition()), (k, v) -> v == null ? r : (r.offset() > v.offset() ? r : v)));
            return highestOffsetMap.values();
        }

        @Override
        public void seek(String topic, int partition, long offset) {
            this.seeks.add(new TopicPartitionInitialOffset(topic, partition, offset));
        }

        @Override
        public void seekToBeginning(String topic, int partition) {
            this.seeks.add(new TopicPartitionInitialOffset(topic, partition, TopicPartitionInitialOffset.SeekPosition.BEGINNING));
        }

        @Override
        public void seekToEnd(String topic, int partition) {
            this.seeks.add(new TopicPartitionInitialOffset(topic, partition, TopicPartitionInitialOffset.SeekPosition.END));
        }

        private final class ConsumerBatchAcknowledgment
        implements Acknowledgment {
            private final List<ConsumerRecord<K, V>> records;

            ConsumerBatchAcknowledgment(List<ConsumerRecord<K, V>> records) {
                this.records = new LinkedList(records);
            }

            @Override
            public void acknowledge() {
                Assert.state((boolean)ListenerConsumer.this.isAnyManualAck, (String)"A manual ackmode is required for an acknowledging listener");
                for (ConsumerRecord record : ListenerConsumer.this.getHighestOffsetRecords(this.records)) {
                    ListenerConsumer.this.processAck(record);
                }
            }

            public String toString() {
                return "Acknowledgment for " + this.records;
            }
        }

        private final class ConsumerAcknowledgment
        implements Acknowledgment {
            private final ConsumerRecord<K, V> record;

            ConsumerAcknowledgment(ConsumerRecord<K, V> record) {
                this.record = record;
            }

            @Override
            public void acknowledge() {
                Assert.state((boolean)ListenerConsumer.this.isAnyManualAck, (String)"A manual ackmode is required for an acknowledging listener");
                ListenerConsumer.this.processAck(this.record);
            }

            public String toString() {
                return "Acknowledgment for " + this.record;
            }
        }
    }
}

