/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.kafka.listener;

import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Timer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.WakeupException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationEvent;
import org.springframework.core.log.LogAccessor;
import org.springframework.core.task.AsyncListenableTaskExecutor;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.kafka.KafkaException;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.KafkaResourceHolder;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.event.ConsumerFailedToStartEvent;
import org.springframework.kafka.event.ConsumerPausedEvent;
import org.springframework.kafka.event.ConsumerResumedEvent;
import org.springframework.kafka.event.ConsumerStartedEvent;
import org.springframework.kafka.event.ConsumerStartingEvent;
import org.springframework.kafka.event.ConsumerStoppedEvent;
import org.springframework.kafka.event.ConsumerStoppingEvent;
import org.springframework.kafka.event.ListenerContainerIdleEvent;
import org.springframework.kafka.event.NonResponsiveConsumerEvent;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.kafka.listener.AfterRollbackProcessor;
import org.springframework.kafka.listener.BatchErrorHandler;
import org.springframework.kafka.listener.BatchLoggingErrorHandler;
import org.springframework.kafka.listener.BatchMessageListener;
import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
import org.springframework.kafka.listener.ConsumerSeekAware;
import org.springframework.kafka.listener.ContainerAwareBatchErrorHandler;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.DelegatingMessageListener;
import org.springframework.kafka.listener.ErrorHandler;
import org.springframework.kafka.listener.GenericErrorHandler;
import org.springframework.kafka.listener.GenericMessageListener;
import org.springframework.kafka.listener.ListenerExecutionFailedException;
import org.springframework.kafka.listener.ListenerType;
import org.springframework.kafka.listener.ListenerUtils;
import org.springframework.kafka.listener.LoggingCommitCallback;
import org.springframework.kafka.listener.LoggingErrorHandler;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.kafka.listener.RecordInterceptor;
import org.springframework.kafka.listener.RemainingRecordsErrorHandler;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaUtils;
import org.springframework.kafka.support.LogIfLevelEnabled;
import org.springframework.kafka.support.SeekUtils;
import org.springframework.kafka.support.TopicPartitionInitialOffset;
import org.springframework.kafka.support.TopicPartitionOffset;
import org.springframework.kafka.support.TransactionSupport;
import org.springframework.kafka.support.serializer.DeserializationException;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2;
import org.springframework.kafka.transaction.KafkaAwareTransactionManager;
import org.springframework.lang.Nullable;
import org.springframework.scheduling.SchedulingAwareRunnable;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.TransactionCallback;
import org.springframework.transaction.support.TransactionCallbackWithoutResult;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.transaction.support.TransactionTemplate;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

public class KafkaMessageListenerContainer<K, V>
extends AbstractMessageListenerContainer<K, V> {
    private static final String UNUSED = "unused";
    private static final int DEFAULT_ACK_TIME = 5000;
    private static final boolean MICROMETER_PRESENT = ClassUtils.isPresent((String)"io.micrometer.core.instrument.MeterRegistry", (ClassLoader)KafkaMessageListenerContainer.class.getClassLoader());
    private final AbstractMessageListenerContainer<K, V> thisOrParentContainer;
    private final TopicPartitionOffset[] topicPartitions;
    private String clientIdSuffix;
    private Runnable emergencyStop = () -> this.stop(() -> {});
    private volatile ListenerConsumer listenerConsumer;
    private volatile ListenableFuture<?> listenerConsumerFuture;
    private volatile CountDownLatch startLatch = new CountDownLatch(1);

    public KafkaMessageListenerContainer(ConsumerFactory<? super K, ? super V> consumerFactory, ContainerProperties containerProperties) {
        this(null, consumerFactory, containerProperties, (TopicPartitionOffset[])null);
    }

    @Deprecated
    public KafkaMessageListenerContainer(ConsumerFactory<? super K, ? super V> consumerFactory, ContainerProperties containerProperties, TopicPartitionOffset ... topicPartitions) {
        this(null, consumerFactory, containerProperties, topicPartitions);
    }

    KafkaMessageListenerContainer(AbstractMessageListenerContainer<K, V> container, ConsumerFactory<? super K, ? super V> consumerFactory, ContainerProperties containerProperties) {
        this(container, consumerFactory, containerProperties, (TopicPartitionOffset[])null);
    }

    @Deprecated
    KafkaMessageListenerContainer(AbstractMessageListenerContainer<K, V> container, ConsumerFactory<? super K, ? super V> consumerFactory, ContainerProperties containerProperties, TopicPartitionInitialOffset ... topicPartitions) {
        super(consumerFactory, containerProperties);
        Assert.notNull(consumerFactory, (String)"A ConsumerFactory must be provided");
        this.thisOrParentContainer = container == null ? this : container;
        this.topicPartitions = topicPartitions != null ? (TopicPartitionOffset[])Arrays.stream(topicPartitions).map(TopicPartitionInitialOffset::toTPO).toArray(TopicPartitionOffset[]::new) : containerProperties.getTopicPartitionsToAssign();
    }

    KafkaMessageListenerContainer(AbstractMessageListenerContainer<K, V> container, ConsumerFactory<? super K, ? super V> consumerFactory, ContainerProperties containerProperties, TopicPartitionOffset ... topicPartitions) {
        super(consumerFactory, containerProperties);
        Assert.notNull(consumerFactory, (String)"A ConsumerFactory must be provided");
        this.thisOrParentContainer = container == null ? this : container;
        this.topicPartitions = topicPartitions != null ? Arrays.copyOf(topicPartitions, topicPartitions.length) : containerProperties.getTopicPartitionsToAssign();
    }

    public void setEmergencyStop(Runnable emergencyStop) {
        Assert.notNull((Object)emergencyStop, (String)"'emergencyStop' cannot be null");
        this.emergencyStop = emergencyStop;
    }

    public void setClientIdSuffix(String clientIdSuffix) {
        this.clientIdSuffix = clientIdSuffix;
    }

    @Override
    @Nullable
    public Collection<TopicPartition> getAssignedPartitions() {
        ListenerConsumer partitionsListenerConsumer = this.listenerConsumer;
        if (partitionsListenerConsumer != null) {
            if (partitionsListenerConsumer.definedPartitions != null) {
                return Collections.unmodifiableCollection(partitionsListenerConsumer.definedPartitions.keySet());
            }
            if (partitionsListenerConsumer.assignedPartitions != null) {
                return Collections.unmodifiableCollection(partitionsListenerConsumer.assignedPartitions);
            }
            return null;
        }
        return null;
    }

    @Override
    public boolean isContainerPaused() {
        return this.isPaused() && this.listenerConsumer != null && this.listenerConsumer.isConsumerPaused();
    }

    @Override
    public Map<String, Map<MetricName, ? extends Metric>> metrics() {
        Map metrics;
        Iterator metricIterator;
        ListenerConsumer listenerConsumerForMetrics = this.listenerConsumer;
        if (listenerConsumerForMetrics != null && (metricIterator = (metrics = listenerConsumerForMetrics.consumer.metrics()).keySet().iterator()).hasNext()) {
            String clientId = (String)((MetricName)metricIterator.next()).tags().get("client-id");
            return Collections.singletonMap(clientId, metrics);
        }
        return Collections.emptyMap();
    }

    @Override
    protected void doStart() {
        if (this.isRunning()) {
            return;
        }
        if (this.clientIdSuffix == null) {
            this.checkTopics();
        }
        ContainerProperties containerProperties = this.getContainerProperties();
        this.checkAckMode(containerProperties);
        Object messageListener = containerProperties.getMessageListener();
        if (containerProperties.getConsumerTaskExecutor() == null) {
            SimpleAsyncTaskExecutor consumerExecutor = new SimpleAsyncTaskExecutor((this.getBeanName() == null ? "" : this.getBeanName()) + "-C-");
            containerProperties.setConsumerTaskExecutor((AsyncListenableTaskExecutor)consumerExecutor);
        }
        GenericMessageListener listener = (GenericMessageListener)messageListener;
        ListenerType listenerType = this.determineListenerType(listener);
        this.listenerConsumer = new ListenerConsumer(listener, listenerType);
        this.setRunning(true);
        this.startLatch = new CountDownLatch(1);
        this.listenerConsumerFuture = containerProperties.getConsumerTaskExecutor().submitListenable((Runnable)((Object)this.listenerConsumer));
        try {
            if (!this.startLatch.await(containerProperties.getConsumerStartTimout().toMillis(), TimeUnit.MILLISECONDS)) {
                this.logger.error((CharSequence)"Consumer thread failed to start - does the configured task executor have enough threads to support all containers and concurrency?");
                this.publishConsumerFailedToStart();
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    private void checkAckMode(ContainerProperties containerProperties) {
        if (!this.consumerFactory.isAutoCommit()) {
            ContainerProperties.AckMode ackMode = containerProperties.getAckMode();
            if (ackMode.equals((Object)ContainerProperties.AckMode.COUNT) || ackMode.equals((Object)ContainerProperties.AckMode.COUNT_TIME)) {
                Assert.state((containerProperties.getAckCount() > 0 ? 1 : 0) != 0, (String)"'ackCount' must be > 0");
            }
            if ((ackMode.equals((Object)ContainerProperties.AckMode.TIME) || ackMode.equals((Object)ContainerProperties.AckMode.COUNT_TIME)) && containerProperties.getAckTime() == 0L) {
                containerProperties.setAckTime(5000L);
            }
        }
    }

    private ListenerType determineListenerType(GenericMessageListener<?> listener) {
        ListenerType listenerType = ListenerUtils.determineListenerType(listener);
        if (listener instanceof DelegatingMessageListener) {
            GenericMessageListener<Object> delegating = listener;
            while (delegating instanceof DelegatingMessageListener) {
                delegating = ((DelegatingMessageListener)((Object)delegating)).getDelegate();
            }
            listenerType = ListenerUtils.determineListenerType(delegating);
        }
        return listenerType;
    }

    @Override
    protected void doStop(Runnable callback) {
        if (this.isRunning()) {
            this.listenerConsumerFuture.addCallback((ListenableFutureCallback)new StopCallback(callback));
            this.setRunning(false);
            this.listenerConsumer.wakeIfNecessary();
        }
    }

    private void publishIdleContainerEvent(long idleTime, Consumer<?, ?> consumer, boolean paused) {
        if (this.getApplicationEventPublisher() != null) {
            this.getApplicationEventPublisher().publishEvent((ApplicationEvent)new ListenerContainerIdleEvent(this, this.thisOrParentContainer, idleTime, this.getBeanName(), this.getAssignedPartitions(), consumer, paused));
        }
    }

    private void publishNonResponsiveConsumerEvent(long timeSinceLastPoll, Consumer<?, ?> consumer) {
        if (this.getApplicationEventPublisher() != null) {
            this.getApplicationEventPublisher().publishEvent((ApplicationEvent)new NonResponsiveConsumerEvent(this, this.thisOrParentContainer, timeSinceLastPoll, this.getBeanName(), this.getAssignedPartitions(), consumer));
        }
    }

    private void publishConsumerPausedEvent(Collection<TopicPartition> partitions) {
        if (this.getApplicationEventPublisher() != null) {
            this.getApplicationEventPublisher().publishEvent((ApplicationEvent)new ConsumerPausedEvent(this, this.thisOrParentContainer, Collections.unmodifiableCollection(partitions)));
        }
    }

    private void publishConsumerResumedEvent(Collection<TopicPartition> partitions) {
        if (this.getApplicationEventPublisher() != null) {
            this.getApplicationEventPublisher().publishEvent((ApplicationEvent)new ConsumerResumedEvent(this, this.thisOrParentContainer, Collections.unmodifiableCollection(partitions)));
        }
    }

    private void publishConsumerStoppingEvent(Consumer<?, ?> consumer) {
        try {
            if (this.getApplicationEventPublisher() != null) {
                this.getApplicationEventPublisher().publishEvent((ApplicationEvent)new ConsumerStoppingEvent(this, this.thisOrParentContainer, consumer, this.getAssignedPartitions()));
            }
        }
        catch (Exception e) {
            this.logger.error((Throwable)e, (CharSequence)"Failed to publish consumer stopping event");
        }
    }

    private void publishConsumerStoppedEvent() {
        if (this.getApplicationEventPublisher() != null) {
            this.getApplicationEventPublisher().publishEvent((ApplicationEvent)new ConsumerStoppedEvent(this, this.thisOrParentContainer));
        }
    }

    private void publishConsumerStartingEvent() {
        this.startLatch.countDown();
        if (this.getApplicationEventPublisher() != null) {
            this.getApplicationEventPublisher().publishEvent((ApplicationEvent)new ConsumerStartingEvent(this, this.thisOrParentContainer));
        }
    }

    private void publishConsumerStartedEvent() {
        if (this.getApplicationEventPublisher() != null) {
            this.getApplicationEventPublisher().publishEvent((ApplicationEvent)new ConsumerStartedEvent(this, this.thisOrParentContainer));
        }
    }

    private void publishConsumerFailedToStart() {
        if (this.getApplicationEventPublisher() != null) {
            this.getApplicationEventPublisher().publishEvent((ApplicationEvent)new ConsumerFailedToStartEvent(this, this.thisOrParentContainer));
        }
    }

    @Override
    protected AbstractMessageListenerContainer<?, ?> parentOrThis() {
        return this.thisOrParentContainer;
    }

    public String toString() {
        return "KafkaMessageListenerContainer [id=" + this.getBeanName() + (this.clientIdSuffix != null ? ", clientIndex=" + this.clientIdSuffix : "") + ", topicPartitions=" + (this.getAssignedPartitions() == null ? "none assigned" : this.getAssignedPartitions()) + "]";
    }

    private static final class MicrometerHolder {
        private final Set<Timer> meters = ConcurrentHashMap.newKeySet();
        private final MeterRegistry registry;
        private final Timer successTimer;
        private final Timer failTimer;

        MicrometerHolder(@Nullable ApplicationContext context, String name, Map<String, String> tags) {
            if (context == null) {
                throw new IllegalStateException("No micrometer registry present");
            }
            Map registries = context.getBeansOfType(MeterRegistry.class, false, false);
            if (registries.size() != 1) {
                throw new IllegalStateException("No micrometer registry present");
            }
            this.registry = (MeterRegistry)registries.values().iterator().next();
            this.successTimer = this.buildTimer(true, name, "none", tags);
            this.failTimer = this.buildTimer(false, name, "ListenerExecutionFailedException", tags);
        }

        Object start() {
            return Timer.start((MeterRegistry)this.registry);
        }

        void success(Object sample) {
            ((Timer.Sample)sample).stop(this.successTimer);
        }

        void failure(Object sample) {
            ((Timer.Sample)sample).stop(this.failTimer);
        }

        private Timer buildTimer(boolean result, String name, String exception, Map<String, String> tags) {
            Timer.Builder builder = Timer.builder((String)"spring.kafka.listener").description("Kafka Listener Timer").tag("name", name).tag("result", result ? "success" : "failure").tag("exception", exception);
            if (tags != null && !tags.isEmpty()) {
                tags.entrySet().forEach(entry -> builder.tag((String)entry.getKey(), (String)entry.getValue()));
            }
            Timer registeredTimer = builder.register(this.registry);
            this.meters.add(registeredTimer);
            return registeredTimer;
        }

        void destroy() {
            this.meters.forEach(arg_0 -> ((MeterRegistry)this.registry).remove(arg_0));
            this.meters.clear();
        }
    }

    private class StopCallback
    implements ListenableFutureCallback<Object> {
        private final Runnable callback;

        StopCallback(Runnable callback) {
            this.callback = callback;
        }

        public void onFailure(Throwable e) {
            KafkaMessageListenerContainer.this.logger.error(e, (CharSequence)"Error while stopping the container: ");
            if (this.callback != null) {
                this.callback.run();
            }
        }

        public void onSuccess(Object result) {
            KafkaMessageListenerContainer.this.logger.debug(() -> KafkaMessageListenerContainer.this + " stopped normally");
            if (this.callback != null) {
                this.callback.run();
            }
        }
    }

    private static final class OffsetMetadata {
        private final Long offset;
        private final boolean relativeToCurrent;
        private final TopicPartitionOffset.SeekPosition seekPosition;

        OffsetMetadata(Long offset, boolean relativeToCurrent, TopicPartitionOffset.SeekPosition seekPosition) {
            this.offset = offset;
            this.relativeToCurrent = relativeToCurrent;
            this.seekPosition = seekPosition;
        }
    }

    private final class ListenerConsumer
    implements SchedulingAwareRunnable,
    ConsumerSeekAware.ConsumerSeekCallback {
        private static final int SIXTY = 60;
        private static final String UNCHECKED = "unchecked";
        private static final String RAWTYPES = "rawtypes";
        private static final String RAW_TYPES = "rawtypes";
        private final LogAccessor logger = new LogAccessor(LogFactory.getLog(ListenerConsumer.class));
        private final ContainerProperties containerProperties = KafkaMessageListenerContainer.this.getContainerProperties();
        private final OffsetCommitCallback commitCallback = this.containerProperties.getCommitCallback() != null ? this.containerProperties.getCommitCallback() : new LoggingCommitCallback();
        private final Consumer<K, V> consumer;
        private final Map<String, Map<Integer, Long>> offsets = new HashMap<String, Map<Integer, Long>>();
        private final GenericMessageListener<?> genericListener;
        private final ConsumerSeekAware consumerSeekAwareListener;
        private final MessageListener<K, V> listener;
        private final BatchMessageListener<K, V> batchListener;
        private final ListenerType listenerType;
        private final boolean isConsumerAwareListener;
        private final boolean isBatchListener;
        private final boolean wantsFullRecords;
        private final boolean autoCommit;
        private final boolean isManualAck = this.containerProperties.getAckMode().equals((Object)ContainerProperties.AckMode.MANUAL);
        private final boolean isCountAck = this.containerProperties.getAckMode().equals((Object)ContainerProperties.AckMode.COUNT) || this.containerProperties.getAckMode().equals((Object)ContainerProperties.AckMode.COUNT_TIME);
        private final boolean isTimeOnlyAck = this.containerProperties.getAckMode().equals((Object)ContainerProperties.AckMode.TIME);
        private final boolean isManualImmediateAck = this.containerProperties.getAckMode().equals((Object)ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        private final boolean isAnyManualAck = this.isManualAck || this.isManualImmediateAck;
        private final boolean isRecordAck = this.containerProperties.getAckMode().equals((Object)ContainerProperties.AckMode.RECORD);
        private final BlockingQueue<ConsumerRecord<K, V>> acks = new LinkedBlockingQueue();
        private final BlockingQueue<TopicPartitionOffset> seeks = new LinkedBlockingQueue<TopicPartitionOffset>();
        private final ErrorHandler errorHandler;
        private final BatchErrorHandler batchErrorHandler;
        private final PlatformTransactionManager transactionManager = this.containerProperties.getTransactionManager();
        private final KafkaAwareTransactionManager kafkaTxManager = this.transactionManager instanceof KafkaAwareTransactionManager ? (KafkaAwareTransactionManager)this.transactionManager : null;
        private final TransactionTemplate transactionTemplate;
        private final String consumerGroupId = KafkaMessageListenerContainer.this.getGroupId();
        private final TaskScheduler taskScheduler;
        private final ScheduledFuture<?> monitorTask;
        private final LogIfLevelEnabled commitLogger = new LogIfLevelEnabled(this.logger, this.containerProperties.getCommitLogLevel());
        private final Duration pollTimeout = Duration.ofMillis(this.containerProperties.getPollTimeout());
        private final boolean checkNullKeyForExceptions;
        private final boolean checkNullValueForExceptions;
        private final boolean syncCommits = this.containerProperties.isSyncCommits();
        private final Duration syncCommitTimeout;
        private final RecordInterceptor<K, V> recordInterceptor = !KafkaMessageListenerContainer.this.isInterceptBeforeTx() ? KafkaMessageListenerContainer.this.getRecordInterceptor() : null;
        private final RecordInterceptor<K, V> earlyRecordInterceptor = KafkaMessageListenerContainer.this.isInterceptBeforeTx() ? KafkaMessageListenerContainer.this.getRecordInterceptor() : null;
        private final ConsumerSeekAware.ConsumerSeekCallback seekCallback = new InitialOrIdleSeekCallback();
        private final long maxPollInterval;
        private final MicrometerHolder micrometerHolder;
        private final AtomicBoolean polling = new AtomicBoolean();
        private final boolean subBatchPerPartition = this.containerProperties.isSubBatchPerPartition();
        private final Duration authorizationExceptionRetryInterval = this.containerProperties.getAuthorizationExceptionRetryInterval();
        private final ContainerProperties.AssignmentCommitOption autoCommitOption = this.containerProperties.getAssignmentCommitOption();
        private final boolean commitCurrentOnAssignment;
        private Map<TopicPartition, OffsetMetadata> definedPartitions;
        private int count;
        private long last = System.currentTimeMillis();
        private boolean fatalError;
        private boolean taskSchedulerExplicitlySet;
        private long lastReceive;
        private long lastAlertAt = this.lastReceive = System.currentTimeMillis();
        private long nackSleep = -1L;
        private int nackIndex;
        private Iterator<TopicPartition> batchIterator;
        private ConsumerRecords<K, V> lastBatch;
        private volatile boolean consumerPaused;
        private volatile Collection<TopicPartition> assignedPartitions;
        private volatile Thread consumerThread;
        private volatile long lastPoll = System.currentTimeMillis();

        ListenerConsumer(GenericMessageListener<?> listener, ListenerType listenerType) {
            Properties consumerProperties = new Properties(this.containerProperties.getKafkaConsumerProperties());
            this.autoCommit = this.determineAutoCommit(consumerProperties);
            this.consumer = KafkaMessageListenerContainer.this.consumerFactory.createConsumer(this.consumerGroupId, this.containerProperties.getClientId(), KafkaMessageListenerContainer.this.clientIdSuffix, consumerProperties);
            this.transactionTemplate = this.determineTransactionTemplate();
            this.genericListener = listener;
            this.consumerSeekAwareListener = this.checkConsumerSeekAware(listener);
            this.commitCurrentOnAssignment = this.determineCommitCurrent(consumerProperties);
            this.subscribeOrAssignTopics(this.consumer);
            GenericErrorHandler<?> errHandler = KafkaMessageListenerContainer.this.getGenericErrorHandler();
            if (listener instanceof BatchMessageListener) {
                this.listener = null;
                this.batchListener = (BatchMessageListener)listener;
                this.isBatchListener = true;
                this.wantsFullRecords = this.batchListener.wantsPollResult();
            } else if (listener instanceof MessageListener) {
                this.listener = (MessageListener)listener;
                this.batchListener = null;
                this.isBatchListener = false;
                this.wantsFullRecords = false;
            } else {
                throw new IllegalArgumentException("Listener must be one of 'MessageListener', 'BatchMessageListener', or the variants that are consumer aware and/or Acknowledging not " + listener.getClass().getName());
            }
            this.listenerType = listenerType;
            boolean bl = this.isConsumerAwareListener = listenerType.equals((Object)ListenerType.ACKNOWLEDGING_CONSUMER_AWARE) || listenerType.equals((Object)ListenerType.CONSUMER_AWARE);
            if (this.isBatchListener) {
                this.validateErrorHandler(true);
                this.errorHandler = new LoggingErrorHandler();
                this.batchErrorHandler = this.determineBatchErrorHandler(errHandler);
            } else {
                this.validateErrorHandler(false);
                this.errorHandler = this.determineErrorHandler(errHandler);
                this.batchErrorHandler = new BatchLoggingErrorHandler();
            }
            Assert.state((!this.isBatchListener || !this.isRecordAck ? 1 : 0) != 0, (String)"Cannot use AckMode.RECORD with a batch listener");
            if (this.containerProperties.getScheduler() != null) {
                this.taskScheduler = this.containerProperties.getScheduler();
                this.taskSchedulerExplicitlySet = true;
            } else {
                ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler();
                threadPoolTaskScheduler.initialize();
                this.taskScheduler = threadPoolTaskScheduler;
            }
            this.monitorTask = this.taskScheduler.scheduleAtFixedRate(this::checkConsumer, Duration.ofSeconds(this.containerProperties.getMonitorInterval()));
            if (this.containerProperties.isLogContainerConfig()) {
                this.logger.info((CharSequence)this.toString());
            }
            Map<String, Object> props = KafkaMessageListenerContainer.this.consumerFactory.getConfigurationProperties();
            this.checkNullKeyForExceptions = this.checkDeserializer(this.findDeserializerClass(props, false));
            this.checkNullValueForExceptions = this.checkDeserializer(this.findDeserializerClass(props, true));
            this.syncCommitTimeout = this.determineSyncCommitTimeout();
            if (this.containerProperties.getSyncCommitTimeout() == null) {
                this.containerProperties.setSyncCommitTimeout(this.syncCommitTimeout);
                if (KafkaMessageListenerContainer.this.thisOrParentContainer != null) {
                    KafkaMessageListenerContainer.this.thisOrParentContainer.getContainerProperties().setSyncCommitTimeout(this.syncCommitTimeout);
                }
            }
            this.maxPollInterval = this.obtainMaxPollInterval(consumerProperties);
            this.micrometerHolder = this.obtainMicrometerHolder();
        }

        private boolean determineCommitCurrent(Properties consumerProperties) {
            if (ContainerProperties.AssignmentCommitOption.NEVER.equals((Object)this.autoCommitOption)) {
                return false;
            }
            if (!this.autoCommit && ContainerProperties.AssignmentCommitOption.ALWAYS.equals((Object)this.autoCommitOption)) {
                return true;
            }
            String autoOffsetReset = consumerProperties.getProperty("auto.offset.reset");
            boolean resetLatest = autoOffsetReset == null || autoOffsetReset.equals("latest");
            boolean latestOnlyOption = ContainerProperties.AssignmentCommitOption.LATEST_ONLY.equals((Object)this.autoCommitOption) || ContainerProperties.AssignmentCommitOption.LATEST_ONLY_NO_TX.equals((Object)this.autoCommitOption);
            return !this.autoCommit && resetLatest && latestOnlyOption;
        }

        private long obtainMaxPollInterval(Properties consumerProperties) {
            Object timeout = consumerProperties.get("max.poll.interval.ms");
            if (timeout == null) {
                timeout = KafkaMessageListenerContainer.this.consumerFactory.getConfigurationProperties().get("max.poll.interval.ms");
            }
            if (timeout instanceof Duration) {
                return ((Duration)timeout).toMillis();
            }
            if (timeout instanceof Number) {
                return ((Number)timeout).longValue();
            }
            if (timeout instanceof String) {
                return Long.parseLong((String)timeout);
            }
            if (timeout != null) {
                Object timeoutToLog = timeout;
                this.logger.warn(() -> "Unexpected type: " + timeoutToLog.getClass().getName() + " in property '" + "max.poll.interval.ms" + "'; defaulting to 30 seconds.");
            }
            return Duration.ofSeconds(30L).toMillis();
        }

        @Nullable
        private ConsumerSeekAware checkConsumerSeekAware(GenericMessageListener<?> candidate) {
            return candidate instanceof ConsumerSeekAware ? (ConsumerSeekAware)((Object)candidate) : null;
        }

        boolean isConsumerPaused() {
            return this.consumerPaused;
        }

        @Nullable
        private TransactionTemplate determineTransactionTemplate() {
            return this.transactionManager != null ? new TransactionTemplate(this.transactionManager) : null;
        }

        private boolean determineAutoCommit(Properties consumerProperties) {
            boolean isAutoCommit;
            String autoCommitOverride = consumerProperties.getProperty("enable.auto.commit");
            if (!KafkaMessageListenerContainer.this.consumerFactory.getConfigurationProperties().containsKey("enable.auto.commit") && autoCommitOverride == null) {
                consumerProperties.setProperty("enable.auto.commit", "false");
                isAutoCommit = false;
            } else {
                isAutoCommit = autoCommitOverride != null ? Boolean.parseBoolean(autoCommitOverride) : KafkaMessageListenerContainer.this.consumerFactory.isAutoCommit();
            }
            Assert.state((!this.isAnyManualAck || !isAutoCommit ? 1 : 0) != 0, () -> "Consumer cannot be configured for auto commit for ackMode " + (Object)((Object)this.containerProperties.getAckMode()));
            return isAutoCommit;
        }

        private Duration determineSyncCommitTimeout() {
            if (this.containerProperties.getSyncCommitTimeout() != null) {
                return this.containerProperties.getSyncCommitTimeout();
            }
            Object timeout = this.containerProperties.getKafkaConsumerProperties().get("default.api.timeout.ms");
            if (timeout == null) {
                timeout = KafkaMessageListenerContainer.this.consumerFactory.getConfigurationProperties().get("default.api.timeout.ms");
            }
            if (timeout instanceof Duration) {
                return (Duration)timeout;
            }
            if (timeout instanceof Number) {
                return Duration.ofMillis(((Number)timeout).longValue());
            }
            if (timeout instanceof String) {
                return Duration.ofMillis(Long.parseLong((String)timeout));
            }
            if (timeout != null) {
                Object timeoutToLog = timeout;
                this.logger.warn(() -> "Unexpected type: " + timeoutToLog.getClass().getName() + " in property '" + "default.api.timeout.ms" + "'; defaulting to 60 seconds for sync commit timeouts");
            }
            return Duration.ofSeconds(60L);
        }

        private Object findDeserializerClass(Map<String, Object> props, boolean isValue) {
            Object configuredDeserializer;
            Object object = configuredDeserializer = isValue ? KafkaMessageListenerContainer.this.consumerFactory.getValueDeserializer() : KafkaMessageListenerContainer.this.consumerFactory.getKeyDeserializer();
            if (configuredDeserializer == null) {
                return props.get(isValue ? "value.deserializer" : "key.deserializer");
            }
            return configuredDeserializer.getClass();
        }

        private void subscribeOrAssignTopics(Consumer<? super K, ? super V> subscribingConsumer) {
            if (KafkaMessageListenerContainer.this.topicPartitions == null) {
                ListenerConsumerRebalanceListener rebalanceListener = new ListenerConsumerRebalanceListener();
                Pattern topicPattern = this.containerProperties.getTopicPattern();
                if (topicPattern != null) {
                    subscribingConsumer.subscribe(topicPattern, (ConsumerRebalanceListener)rebalanceListener);
                } else {
                    subscribingConsumer.subscribe(Arrays.asList(this.containerProperties.getTopics()), (ConsumerRebalanceListener)rebalanceListener);
                }
            } else {
                List<TopicPartitionOffset> topicPartitionsToAssign = Arrays.asList(KafkaMessageListenerContainer.this.topicPartitions);
                this.definedPartitions = new HashMap<TopicPartition, OffsetMetadata>(topicPartitionsToAssign.size());
                for (TopicPartitionOffset topicPartition : topicPartitionsToAssign) {
                    this.definedPartitions.put(topicPartition.getTopicPartition(), new OffsetMetadata(topicPartition.getOffset(), topicPartition.isRelativeToCurrent(), topicPartition.getPosition()));
                }
                subscribingConsumer.assign(new ArrayList<TopicPartition>(this.definedPartitions.keySet()));
            }
        }

        private boolean checkDeserializer(Object deser) {
            return deser instanceof Class ? ErrorHandlingDeserializer2.class.isAssignableFrom((Class)deser) : deser instanceof String && deser.equals(ErrorHandlingDeserializer2.class.getName());
        }

        protected void checkConsumer() {
            long timeSinceLastPoll = System.currentTimeMillis() - this.lastPoll;
            if ((float)timeSinceLastPoll / (float)this.containerProperties.getPollTimeout() > this.containerProperties.getNoPollThreshold()) {
                KafkaMessageListenerContainer.this.publishNonResponsiveConsumerEvent(timeSinceLastPoll, this.consumer);
            }
        }

        protected BatchErrorHandler determineBatchErrorHandler(GenericErrorHandler<?> errHandler) {
            return errHandler != null ? (BatchErrorHandler)errHandler : (this.transactionManager != null ? null : new BatchLoggingErrorHandler());
        }

        protected ErrorHandler determineErrorHandler(GenericErrorHandler<?> errHandler) {
            return errHandler != null ? (ErrorHandler)errHandler : (this.transactionManager != null ? null : new LoggingErrorHandler());
        }

        @Nullable
        private MicrometerHolder obtainMicrometerHolder() {
            MicrometerHolder holder = null;
            try {
                if (MICROMETER_PRESENT && this.containerProperties.isMicrometerEnabled()) {
                    holder = new MicrometerHolder(KafkaMessageListenerContainer.this.getApplicationContext(), KafkaMessageListenerContainer.this.getBeanName(), this.containerProperties.getMicrometerTags());
                }
            }
            catch (IllegalStateException illegalStateException) {
                // empty catch block
            }
            return holder;
        }

        private void seekPartitions(Collection<TopicPartition> partitions, boolean idle) {
            this.consumerSeekAwareListener.registerSeekCallback(this);
            HashMap<TopicPartition, Long> current = new HashMap<TopicPartition, Long>();
            for (TopicPartition topicPartition : partitions) {
                current.put(topicPartition, this.consumer.position(topicPartition));
            }
            if (idle) {
                this.consumerSeekAwareListener.onIdleContainer(current, this.seekCallback);
            } else {
                this.consumerSeekAwareListener.onPartitionsAssigned(current, this.seekCallback);
            }
        }

        private void validateErrorHandler(boolean batch) {
            GenericErrorHandler<?> errHandler = KafkaMessageListenerContainer.this.getGenericErrorHandler();
            if (errHandler == null) {
                return;
            }
            Class<?> clazz = errHandler.getClass();
            Assert.state((boolean)(batch ? BatchErrorHandler.class.isAssignableFrom(clazz) : ErrorHandler.class.isAssignableFrom(clazz)), () -> "Error handler is not compatible with the message listener, expecting an instance of " + (batch ? "BatchErrorHandler" : "ErrorHandler") + " not " + errHandler.getClass().getName());
        }

        public boolean isLongLived() {
            return true;
        }

        public void run() {
            KafkaMessageListenerContainer.this.publishConsumerStartingEvent();
            this.consumerThread = Thread.currentThread();
            if (this.consumerSeekAwareListener != null) {
                this.consumerSeekAwareListener.registerSeekCallback(this);
            }
            KafkaUtils.setConsumerGroupId(this.consumerGroupId);
            this.count = 0;
            this.last = System.currentTimeMillis();
            this.initAssignedPartitions();
            KafkaMessageListenerContainer.this.publishConsumerStartedEvent();
            while (KafkaMessageListenerContainer.this.isRunning()) {
                try {
                    this.pollAndInvoke();
                }
                catch (WakeupException wakeupException) {
                }
                catch (NoOffsetForPartitionException nofpe) {
                    this.fatalError = true;
                    this.logger.error((Throwable)nofpe, (CharSequence)"No offset and no reset policy");
                    break;
                }
                catch (AuthorizationException ae) {
                    if (this.authorizationExceptionRetryInterval == null) {
                        this.logger.error((Throwable)ae, (CharSequence)"Authorization Exception and no authorizationExceptionRetryInterval set");
                        this.fatalError = true;
                        break;
                    }
                    this.logger.error((Throwable)ae, (CharSequence)("Authorization Exception, retrying in " + this.authorizationExceptionRetryInterval.toMillis() + " ms"));
                    this.sleepFor(this.authorizationExceptionRetryInterval);
                }
                catch (Exception e) {
                    this.handleConsumerException(e);
                }
                catch (Error e) {
                    Runnable runnable = KafkaMessageListenerContainer.this.emergencyStop;
                    if (runnable != null) {
                        runnable.run();
                    }
                    this.logger.error((Throwable)e, (CharSequence)"Stopping container due to an Error");
                    this.wrapUp();
                    throw e;
                }
            }
            this.wrapUp();
        }

        private void initAssignedPartitions() {
            if (KafkaMessageListenerContainer.this.isRunning() && this.definedPartitions != null) {
                try {
                    this.initPartitionsIfNeeded();
                }
                catch (Exception e) {
                    this.logger.error((Throwable)e, (CharSequence)"Failed to set initial offsets");
                }
            }
        }

        protected void pollAndInvoke() {
            if (!this.autoCommit && !this.isRecordAck) {
                this.processCommits();
            }
            this.idleBetweenPollIfNecessary();
            if (this.seeks.size() > 0) {
                this.processSeeks();
            }
            this.pauseConsumerIfNecessary();
            this.lastPoll = System.currentTimeMillis();
            this.polling.set(true);
            ConsumerRecords records = this.doPoll();
            if (!this.polling.compareAndSet(true, false)) {
                if (records.count() > 0) {
                    this.logger.debug(() -> "Discarding polled records, container stopped: " + records.count());
                }
                return;
            }
            this.resumeConsumerIfNeccessary();
            this.debugRecords(records);
            if (records != null && records.count() > 0) {
                if (this.containerProperties.getIdleEventInterval() != null) {
                    this.lastReceive = System.currentTimeMillis();
                }
                this.invokeListener(records);
            } else {
                this.checkIdle();
            }
        }

        private ConsumerRecords<K, V> doPoll() {
            ConsumerRecords records;
            if (this.isBatchListener && this.subBatchPerPartition) {
                if (this.batchIterator == null) {
                    this.lastBatch = this.consumer.poll(this.pollTimeout);
                    if (this.lastBatch.count() == 0) {
                        return this.lastBatch;
                    }
                    this.batchIterator = this.lastBatch.partitions().iterator();
                }
                TopicPartition next = this.batchIterator.next();
                List subBatch = this.lastBatch.records(next);
                records = new ConsumerRecords(Collections.singletonMap(next, subBatch));
                if (!this.batchIterator.hasNext()) {
                    this.batchIterator = null;
                }
            } else {
                records = this.consumer.poll(this.pollTimeout);
            }
            return records;
        }

        void wakeIfNecessary() {
            if (this.polling.getAndSet(false)) {
                this.consumer.wakeup();
            }
        }

        private void debugRecords(ConsumerRecords<K, V> records) {
            if (records != null) {
                this.logger.debug(() -> "Received: " + records.count() + " records");
                if (records.count() > 0) {
                    this.logger.trace(() -> records.partitions().stream().flatMap(p -> records.records(p).stream()).map(r -> r.topic() + "-" + r.partition() + "@" + r.offset()).collect(Collectors.toList()).toString());
                }
            }
        }

        private void sleepFor(Duration duration) {
            try {
                TimeUnit.MILLISECONDS.sleep(duration.toMillis());
            }
            catch (InterruptedException e) {
                this.logger.error((Throwable)e, (CharSequence)"Interrupted while sleeping");
            }
        }

        private void pauseConsumerIfNecessary() {
            if (!this.consumerPaused && KafkaMessageListenerContainer.this.isPaused()) {
                this.consumer.pause((Collection)this.consumer.assignment());
                this.consumerPaused = true;
                this.logger.debug(() -> "Paused consumption from: " + this.consumer.paused());
                KafkaMessageListenerContainer.this.publishConsumerPausedEvent(this.consumer.assignment());
            }
        }

        private void resumeConsumerIfNeccessary() {
            if (this.consumerPaused && !KafkaMessageListenerContainer.this.isPaused()) {
                this.logger.debug(() -> "Resuming consumption from: " + this.consumer.paused());
                Set paused = this.consumer.paused();
                this.consumer.resume((Collection)paused);
                this.consumerPaused = false;
                KafkaMessageListenerContainer.this.publishConsumerResumedEvent(paused);
            }
        }

        private void checkIdle() {
            long now;
            if (this.containerProperties.getIdleEventInterval() != null && (now = System.currentTimeMillis()) > this.lastReceive + this.containerProperties.getIdleEventInterval() && now > this.lastAlertAt + this.containerProperties.getIdleEventInterval()) {
                Collection<TopicPartition> partitions;
                KafkaMessageListenerContainer.this.publishIdleContainerEvent(now - this.lastReceive, this.isConsumerAwareListener ? this.consumer : null, this.consumerPaused);
                this.lastAlertAt = now;
                if (this.consumerSeekAwareListener != null && (partitions = KafkaMessageListenerContainer.this.getAssignedPartitions()) != null) {
                    this.seekPartitions(partitions, true);
                }
            }
        }

        private void idleBetweenPollIfNecessary() {
            long idleBetweenPolls = this.containerProperties.getIdleBetweenPolls();
            if (idleBetweenPolls > 0L && (idleBetweenPolls = Math.min(idleBetweenPolls, this.maxPollInterval - (System.currentTimeMillis() - this.lastPoll) - 5000L)) > 0L) {
                try {
                    TimeUnit.MILLISECONDS.sleep(idleBetweenPolls);
                }
                catch (InterruptedException ex) {
                    Thread.currentThread().interrupt();
                    throw new IllegalStateException("Consumer Thread [" + this + "] has been interrupted", ex);
                }
            }
        }

        private void wrapUp() {
            KafkaUtils.clearConsumerGroupId();
            if (this.micrometerHolder != null) {
                this.micrometerHolder.destroy();
            }
            KafkaMessageListenerContainer.this.publishConsumerStoppingEvent(this.consumer);
            Collection<TopicPartition> partitions = KafkaMessageListenerContainer.this.getAssignedPartitions();
            if (!this.fatalError) {
                if (this.kafkaTxManager == null) {
                    this.commitPendingAcks();
                    try {
                        this.consumer.unsubscribe();
                    }
                    catch (WakeupException wakeupException) {}
                } else {
                    this.closeProducers(partitions);
                }
            } else {
                this.logger.error((CharSequence)"Fatal consumer exception; stopping container");
                KafkaMessageListenerContainer.this.stop();
            }
            this.monitorTask.cancel(true);
            if (!this.taskSchedulerExplicitlySet) {
                ((ThreadPoolTaskScheduler)this.taskScheduler).destroy();
            }
            this.consumer.close();
            KafkaMessageListenerContainer.this.getAfterRollbackProcessor().clearThreadState();
            if (this.errorHandler != null) {
                this.errorHandler.clearThreadState();
            }
            if (this.consumerSeekAwareListener != null) {
                this.consumerSeekAwareListener.onPartitionsRevoked(partitions);
                this.consumerSeekAwareListener.unregisterSeekCallback();
            }
            this.logger.info(() -> KafkaMessageListenerContainer.this.getGroupId() + ": Consumer stopped");
            KafkaMessageListenerContainer.this.publishConsumerStoppedEvent();
        }

        protected void handleConsumerException(Exception e) {
            try {
                if (!this.isBatchListener && this.errorHandler != null) {
                    this.errorHandler.handle(e, Collections.emptyList(), this.consumer, KafkaMessageListenerContainer.this.thisOrParentContainer);
                } else if (this.isBatchListener && this.batchErrorHandler != null) {
                    this.batchErrorHandler.handle(e, new ConsumerRecords(Collections.emptyMap()), this.consumer, KafkaMessageListenerContainer.this.thisOrParentContainer);
                } else {
                    this.logger.error((Throwable)e, (CharSequence)"Consumer exception");
                }
            }
            catch (Exception ex) {
                this.logger.error((Throwable)ex, (CharSequence)"Consumer exception");
            }
        }

        private void commitPendingAcks() {
            this.processCommits();
            if (this.offsets.size() > 0) {
                this.commitIfNecessary();
            }
        }

        private void handleAcks() {
            ConsumerRecord record = (ConsumerRecord)this.acks.poll();
            while (record != null) {
                this.traceAck(record);
                this.processAck(record);
                record = (ConsumerRecord)this.acks.poll();
            }
        }

        private void traceAck(ConsumerRecord<K, V> record) {
            this.logger.trace(() -> "Ack: " + record);
        }

        /*
         * Enabled force condition propagation
         * Lifted jumps to return sites
         */
        private void processAck(ConsumerRecord<K, V> record) {
            if (!Thread.currentThread().equals(this.consumerThread)) {
                try {
                    this.acks.put(record);
                    if (!this.isManualImmediateAck) return;
                    this.consumer.wakeup();
                    return;
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new KafkaException("Interrupted while storing ack", e);
                }
            } else if (this.isManualImmediateAck) {
                try {
                    this.ackImmediate(record);
                    return;
                }
                catch (WakeupException wakeupException) {}
                return;
            } else {
                this.addOffset(record);
            }
        }

        private void ackImmediate(ConsumerRecord<K, V> record) {
            Map<TopicPartition, OffsetAndMetadata> commits = Collections.singletonMap(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1L));
            this.commitLogger.log(() -> "Committing: " + commits);
            if (this.syncCommits) {
                this.consumer.commitSync(commits, this.syncCommitTimeout);
            } else {
                this.consumer.commitAsync(commits, this.commitCallback);
            }
        }

        private void invokeListener(ConsumerRecords<K, V> records) {
            if (this.isBatchListener) {
                this.invokeBatchListener(records);
            } else {
                this.invokeRecordListener(records);
            }
        }

        private void invokeBatchListener(ConsumerRecords<K, V> records) {
            List recordList = null;
            if (!this.wantsFullRecords) {
                recordList = this.createRecordList(records);
            }
            if (this.wantsFullRecords || recordList.size() > 0) {
                if (this.transactionTemplate != null) {
                    this.invokeBatchListenerInTx(records, recordList);
                } else {
                    this.doInvokeBatchListener(records, recordList, null);
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void invokeBatchListenerInTx(final ConsumerRecords<K, V> records, final List<ConsumerRecord<K, V>> recordList) {
            try {
                if (this.subBatchPerPartition) {
                    ConsumerRecord record = recordList.get(0);
                    TransactionSupport.setTransactionIdSuffix(this.zombieFenceTxIdSuffix(record.topic(), record.partition()));
                }
                this.transactionTemplate.execute((TransactionCallback)new TransactionCallbackWithoutResult(){

                    public void doInTransactionWithoutResult(TransactionStatus s) {
                        RuntimeException aborted;
                        Producer producer = null;
                        if (ListenerConsumer.this.kafkaTxManager != null) {
                            producer = ((KafkaResourceHolder)((Object)TransactionSynchronizationManager.getResource(ListenerConsumer.this.kafkaTxManager.getProducerFactory()))).getProducer();
                        }
                        if ((aborted = ListenerConsumer.this.doInvokeBatchListener(records, recordList, producer)) != null) {
                            throw aborted;
                        }
                    }
                });
            }
            catch (ProducerFencedException e) {
                this.logger.error((Throwable)e, (CharSequence)"Producer fenced during transaction");
            }
            catch (RuntimeException e) {
                this.logger.error((Throwable)e, (CharSequence)"Transaction rolled back");
                final AfterRollbackProcessor afterRollbackProcessorToUse = KafkaMessageListenerContainer.this.getAfterRollbackProcessor();
                if (afterRollbackProcessorToUse.isProcessInTransaction() && this.transactionTemplate != null) {
                    this.transactionTemplate.execute((TransactionCallback)new TransactionCallbackWithoutResult(){

                        protected void doInTransactionWithoutResult(TransactionStatus status) {
                            ListenerConsumer.this.batchAfterRollback(records, recordList, e, afterRollbackProcessorToUse);
                        }
                    });
                } else {
                    this.batchAfterRollback(records, recordList, e, afterRollbackProcessorToUse);
                }
            }
            finally {
                if (this.subBatchPerPartition) {
                    TransactionSupport.clearTransactionIdSuffix();
                }
            }
        }

        private void batchAfterRollback(ConsumerRecords<K, V> records, List<ConsumerRecord<K, V>> recordList, RuntimeException e, AfterRollbackProcessor<K, V> afterRollbackProcessorToUse) {
            try {
                if (recordList == null) {
                    afterRollbackProcessorToUse.process(this.createRecordList(records), this.consumer, e, false);
                } else {
                    afterRollbackProcessorToUse.process(recordList, this.consumer, e, false);
                }
            }
            catch (Exception ex) {
                this.logger.error((Throwable)ex, (CharSequence)"AfterRollbackProcessor threw exception");
            }
        }

        private List<ConsumerRecord<K, V>> createRecordList(ConsumerRecords<K, V> records) {
            Iterator iterator = records.iterator();
            LinkedList list = new LinkedList();
            while (iterator.hasNext()) {
                list.add((ConsumerRecord)iterator.next());
            }
            return list;
        }

        private RuntimeException doInvokeBatchListener(ConsumerRecords<K, V> records, List<ConsumerRecord<K, V>> recordList, Producer producer) {
            Object sample = this.startMicrometerSample();
            try {
                this.invokeBatchOnMessage(records, recordList, producer);
                this.successTimer(sample);
            }
            catch (RuntimeException e) {
                boolean acked;
                this.failureTimer(sample);
                boolean bl = acked = this.containerProperties.isAckOnError() && !this.autoCommit && producer == null;
                if (acked) {
                    this.acks.addAll(this.getHighestOffsetRecords(records));
                }
                if (this.batchErrorHandler == null) {
                    throw e;
                }
                try {
                    this.invokeBatchErrorHandler(records, e);
                    if (!acked && !this.autoCommit && this.batchErrorHandler.isAckAfterHandle() || producer != null) {
                        this.acks.addAll(this.getHighestOffsetRecords(records));
                        if (producer != null) {
                            this.sendOffsetsToTransaction(producer);
                        }
                    }
                }
                catch (RuntimeException ee) {
                    this.logger.error((Throwable)ee, (CharSequence)"Error handler threw an exception");
                    return ee;
                }
                catch (Error er) {
                    this.logger.error((Throwable)er, (CharSequence)"Error handler threw an error");
                    throw er;
                }
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            return null;
        }

        @Nullable
        private Object startMicrometerSample() {
            if (this.micrometerHolder != null) {
                return this.micrometerHolder.start();
            }
            return null;
        }

        private void successTimer(@Nullable Object sample) {
            if (sample != null) {
                this.micrometerHolder.success(sample);
            }
        }

        private void failureTimer(@Nullable Object sample) {
            if (sample != null) {
                this.micrometerHolder.failure(sample);
            }
        }

        private void invokeBatchOnMessage(ConsumerRecords<K, V> records, List<ConsumerRecord<K, V>> recordList, Producer producer) throws InterruptedException {
            if (this.wantsFullRecords) {
                this.batchListener.onMessage(records, (Acknowledgment)(this.isAnyManualAck ? new ConsumerBatchAcknowledgment(records) : null), this.consumer);
            } else {
                this.doInvokeBatchOnMessage(records, recordList);
            }
            ArrayList toSeek = null;
            if (this.nackSleep >= 0L) {
                int index = 0;
                toSeek = new ArrayList();
                for (ConsumerRecord record : records) {
                    if (index++ >= this.nackIndex) {
                        toSeek.add(record);
                        continue;
                    }
                    this.acks.put(record);
                }
            }
            if (producer != null || !this.isAnyManualAck && !this.autoCommit) {
                for (ConsumerRecord record : this.getHighestOffsetRecords(records)) {
                    this.acks.put(record);
                }
                if (producer != null) {
                    this.sendOffsetsToTransaction(producer);
                }
            }
            if (toSeek != null) {
                if (!this.autoCommit) {
                    this.processCommits();
                }
                SeekUtils.doSeeks(toSeek, this.consumer, null, true, (rec, ex) -> false, this.logger);
                this.nackSleepAndReset();
            }
        }

        private void doInvokeBatchOnMessage(ConsumerRecords<K, V> records, List<ConsumerRecord<K, V>> recordList) {
            switch (this.listenerType) {
                case ACKNOWLEDGING_CONSUMER_AWARE: {
                    this.batchListener.onMessage(recordList, (Acknowledgment)(this.isAnyManualAck ? new ConsumerBatchAcknowledgment(records) : null), this.consumer);
                    break;
                }
                case ACKNOWLEDGING: {
                    this.batchListener.onMessage(recordList, this.isAnyManualAck ? new ConsumerBatchAcknowledgment(records) : null);
                    break;
                }
                case CONSUMER_AWARE: {
                    this.batchListener.onMessage(recordList, this.consumer);
                    break;
                }
                case SIMPLE: {
                    this.batchListener.onMessage(recordList);
                }
            }
        }

        private void invokeBatchErrorHandler(ConsumerRecords<K, V> records, RuntimeException e) {
            if (this.batchErrorHandler instanceof ContainerAwareBatchErrorHandler) {
                this.batchErrorHandler.handle(this.decorateException(e), records, this.consumer, KafkaMessageListenerContainer.this.thisOrParentContainer);
            } else {
                this.batchErrorHandler.handle(this.decorateException(e), records, this.consumer);
            }
        }

        private void invokeRecordListener(ConsumerRecords<K, V> records) {
            if (this.transactionTemplate != null) {
                this.invokeRecordListenerInTx(records);
            } else {
                this.doInvokeWithRecords(records);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void invokeRecordListenerInTx(ConsumerRecords<K, V> records) {
            final Iterator iterator = records.iterator();
            while (iterator.hasNext()) {
                final ConsumerRecord record = this.checkEarlyIntercept((ConsumerRecord)iterator.next());
                if (record == null) continue;
                this.logger.trace(() -> "Processing " + record);
                try {
                    TransactionSupport.setTransactionIdSuffix(this.zombieFenceTxIdSuffix(record.topic(), record.partition()));
                    this.transactionTemplate.execute((TransactionCallback)new TransactionCallbackWithoutResult(){

                        public void doInTransactionWithoutResult(TransactionStatus s) {
                            RuntimeException aborted;
                            Producer producer = null;
                            if (ListenerConsumer.this.kafkaTxManager != null) {
                                producer = ((KafkaResourceHolder)((Object)TransactionSynchronizationManager.getResource(ListenerConsumer.this.kafkaTxManager.getProducerFactory()))).getProducer();
                            }
                            if ((aborted = ListenerConsumer.this.doInvokeRecordListener(record, producer, iterator)) != null) {
                                throw aborted;
                            }
                        }
                    });
                }
                catch (ProducerFencedException e) {
                    this.logger.error((Throwable)e, (CharSequence)"Producer fenced during transaction");
                }
                catch (RuntimeException e) {
                    this.logger.error((Throwable)e, (CharSequence)"Transaction rolled back");
                    this.recordAfterRollback(iterator, record, e);
                }
                finally {
                    TransactionSupport.clearTransactionIdSuffix();
                }
                if (this.nackSleep < 0L) continue;
                this.handleNack(records, record);
                break;
            }
        }

        private void recordAfterRollback(Iterator<ConsumerRecord<K, V>> iterator, ConsumerRecord<K, V> record, final RuntimeException e) {
            final ArrayList unprocessed = new ArrayList();
            unprocessed.add(record);
            while (iterator.hasNext()) {
                unprocessed.add(iterator.next());
            }
            final AfterRollbackProcessor afterRollbackProcessorToUse = KafkaMessageListenerContainer.this.getAfterRollbackProcessor();
            if (afterRollbackProcessorToUse.isProcessInTransaction() && this.transactionTemplate != null) {
                this.transactionTemplate.execute((TransactionCallback)new TransactionCallbackWithoutResult(){

                    protected void doInTransactionWithoutResult(TransactionStatus status) {
                        afterRollbackProcessorToUse.process(unprocessed, ListenerConsumer.this.consumer, e, true);
                    }
                });
            } else {
                try {
                    afterRollbackProcessorToUse.process(unprocessed, this.consumer, e, true);
                }
                catch (Exception ex) {
                    this.logger.error((Throwable)ex, (CharSequence)"AfterRollbackProcessor threw exception");
                }
            }
        }

        private void doInvokeWithRecords(ConsumerRecords<K, V> records) {
            Iterator iterator = records.iterator();
            while (iterator.hasNext()) {
                ConsumerRecord record = this.checkEarlyIntercept((ConsumerRecord)iterator.next());
                if (record == null) continue;
                this.logger.trace(() -> "Processing " + record);
                this.doInvokeRecordListener(record, null, iterator);
                if (this.nackSleep < 0L) continue;
                this.handleNack(records, record);
                break;
            }
        }

        private ConsumerRecord<K, V> checkEarlyIntercept(ConsumerRecord<K, V> nextArg) {
            ConsumerRecord next = nextArg;
            if (this.earlyRecordInterceptor != null && (next = this.earlyRecordInterceptor.intercept(next)) == null && this.logger.isDebugEnabled()) {
                this.logger.debug((CharSequence)("RecordInterceptor returned null, skipping: " + nextArg));
            }
            return next;
        }

        private void handleNack(ConsumerRecords<K, V> records, ConsumerRecord<K, V> record) {
            if (!this.autoCommit && !this.isRecordAck) {
                this.processCommits();
            }
            ArrayList list = new ArrayList();
            for (ConsumerRecord next : records) {
                if (!next.equals(record) && list.size() <= 0) continue;
                list.add(next);
            }
            SeekUtils.doSeeks(list, this.consumer, null, true, (rec, ex) -> false, this.logger);
            this.nackSleepAndReset();
        }

        private void nackSleepAndReset() {
            try {
                Thread.sleep(this.nackSleep);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            this.nackSleep = -1L;
        }

        private RuntimeException doInvokeRecordListener(ConsumerRecord<K, V> record, Producer producer, Iterator<ConsumerRecord<K, V>> iterator) {
            Object sample = this.startMicrometerSample();
            try {
                this.invokeOnMessage(record, producer);
                this.successTimer(sample);
            }
            catch (RuntimeException e) {
                boolean acked;
                this.failureTimer(sample);
                boolean bl = acked = this.containerProperties.isAckOnError() && !this.autoCommit && producer == null;
                if (acked) {
                    this.ackCurrent(record);
                }
                if (this.errorHandler == null) {
                    throw e;
                }
                try {
                    this.invokeErrorHandler(record, producer, iterator, e);
                    if (!acked && !this.autoCommit && this.errorHandler.isAckAfterHandle() || producer != null) {
                        this.ackCurrent(record, producer);
                    }
                }
                catch (RuntimeException ee) {
                    this.logger.error((Throwable)ee, (CharSequence)"Error handler threw an exception");
                    return ee;
                }
                catch (Error er) {
                    this.logger.error((Throwable)er, (CharSequence)"Error handler threw an error");
                    throw er;
                }
            }
            return null;
        }

        private void invokeOnMessage(ConsumerRecord<K, V> record, @Nullable Producer producer) {
            if (record.value() instanceof DeserializationException) {
                throw (DeserializationException)((Object)record.value());
            }
            if (record.key() instanceof DeserializationException) {
                throw (DeserializationException)((Object)record.key());
            }
            if (record.value() == null && this.checkNullValueForExceptions) {
                this.checkDeser(record, "springDeserializerExceptionValue");
            }
            if (record.key() == null && this.checkNullKeyForExceptions) {
                this.checkDeser(record, "springDeserializerExceptionKey");
            }
            this.doInvokeOnMessage(record);
            if (this.nackSleep < 0L) {
                this.ackCurrent(record, producer);
            }
        }

        private void doInvokeOnMessage(ConsumerRecord<K, V> recordArg) {
            ConsumerRecord record = recordArg;
            if (this.recordInterceptor != null) {
                record = this.recordInterceptor.intercept(record);
            }
            if (record == null) {
                this.logger.debug(() -> "RecordInterceptor returned null, skipping: " + recordArg);
            } else {
                switch (this.listenerType) {
                    case ACKNOWLEDGING_CONSUMER_AWARE: {
                        this.listener.onMessage(record, this.isAnyManualAck ? new ConsumerAcknowledgment(record) : null, this.consumer);
                        break;
                    }
                    case CONSUMER_AWARE: {
                        this.listener.onMessage(record, this.consumer);
                        break;
                    }
                    case ACKNOWLEDGING: {
                        this.listener.onMessage(record, this.isAnyManualAck ? new ConsumerAcknowledgment(record) : null);
                        break;
                    }
                    case SIMPLE: {
                        this.listener.onMessage(record);
                    }
                }
            }
        }

        private void invokeErrorHandler(ConsumerRecord<K, V> record, @Nullable Producer producer, Iterator<ConsumerRecord<K, V>> iterator, RuntimeException e) {
            if (this.errorHandler instanceof RemainingRecordsErrorHandler) {
                if (producer == null) {
                    this.processCommits();
                }
                ArrayList records = new ArrayList();
                records.add(record);
                while (iterator.hasNext()) {
                    records.add(iterator.next());
                }
                this.errorHandler.handle(this.decorateException(e), records, this.consumer, KafkaMessageListenerContainer.this.thisOrParentContainer);
            } else {
                this.errorHandler.handle(this.decorateException(e), record, this.consumer);
            }
        }

        private Exception decorateException(RuntimeException e) {
            Object toHandle = e;
            toHandle = toHandle instanceof ListenerExecutionFailedException ? new ListenerExecutionFailedException(toHandle.getMessage(), this.consumerGroupId, toHandle.getCause()) : new ListenerExecutionFailedException("Listener failed", this.consumerGroupId, (Throwable)toHandle);
            return toHandle;
        }

        public void checkDeser(ConsumerRecord<K, V> record, String headerName) {
            DeserializationException exception = ListenerUtils.getExceptionFromHeader(record, headerName, this.logger);
            if (exception != null) {
                throw exception;
            }
        }

        public void ackCurrent(ConsumerRecord<K, V> record) {
            this.ackCurrent(record, null);
        }

        public void ackCurrent(ConsumerRecord<K, V> record, @Nullable Producer producer) {
            if (this.isRecordAck) {
                Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = Collections.singletonMap(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1L));
                if (producer == null) {
                    this.commitLogger.log(() -> "Committing: " + offsetsToCommit);
                    if (this.syncCommits) {
                        this.consumer.commitSync(offsetsToCommit, this.syncCommitTimeout);
                    } else {
                        this.consumer.commitAsync(offsetsToCommit, this.commitCallback);
                    }
                } else {
                    this.acks.add(record);
                }
            } else if (producer != null || !this.isAnyManualAck && !this.autoCommit) {
                this.acks.add(record);
            }
            if (producer != null) {
                try {
                    this.sendOffsetsToTransaction(producer);
                }
                catch (Exception e) {
                    this.logger.error((Throwable)e, (CharSequence)"Send offsets to transaction failed");
                }
            }
        }

        private void sendOffsetsToTransaction(Producer producer) {
            this.handleAcks();
            Map<TopicPartition, OffsetAndMetadata> commits = this.buildCommits();
            this.commitLogger.log(() -> "Sending offsets to transaction: " + commits);
            producer.sendOffsetsToTransaction(commits, this.consumerGroupId);
        }

        private void processCommits() {
            this.count += this.acks.size();
            this.handleAcks();
            ContainerProperties.AckMode ackMode = this.containerProperties.getAckMode();
            if (!this.isManualImmediateAck) {
                boolean countExceeded;
                if (!this.isManualAck) {
                    this.updatePendingOffsets();
                }
                boolean bl = countExceeded = this.isCountAck && this.count >= this.containerProperties.getAckCount();
                if (!this.isTimeOnlyAck && !this.isCountAck || countExceeded) {
                    if (this.isCountAck) {
                        this.logger.debug(() -> "Committing in " + ackMode.name() + " because count " + this.count + " exceeds configured limit of " + this.containerProperties.getAckCount());
                    }
                    this.commitIfNecessary();
                    this.count = 0;
                } else {
                    this.timedAcks(ackMode);
                }
            }
        }

        private void timedAcks(ContainerProperties.AckMode ackMode) {
            boolean elapsed;
            long now = System.currentTimeMillis();
            boolean bl = elapsed = now - this.last > this.containerProperties.getAckTime();
            if (ackMode.equals((Object)ContainerProperties.AckMode.TIME) && elapsed) {
                this.logger.debug(() -> "Committing in AckMode.TIME because time elapsed exceeds configured limit of " + this.containerProperties.getAckTime());
                this.commitIfNecessary();
                this.last = now;
            } else if (ackMode.equals((Object)ContainerProperties.AckMode.COUNT_TIME) && elapsed) {
                this.logger.debug(() -> "Committing in AckMode.COUNT_TIME because time elapsed exceeds configured limit of " + this.containerProperties.getAckTime());
                this.commitIfNecessary();
                this.last = now;
                this.count = 0;
            }
        }

        private void processSeeks() {
            this.processTimestampSeeks();
            TopicPartitionOffset offset = (TopicPartitionOffset)this.seeks.poll();
            while (offset != null) {
                this.traceSeek(offset);
                try {
                    TopicPartitionOffset.SeekPosition position = offset.getPosition();
                    Long whereTo = offset.getOffset();
                    if (position == null) {
                        if (offset.isRelativeToCurrent()) {
                            whereTo = whereTo + this.consumer.position(offset.getTopicPartition());
                            whereTo = Math.max(whereTo, 0L);
                        }
                        this.consumer.seek(offset.getTopicPartition(), whereTo.longValue());
                    } else if (position.equals((Object)TopicPartitionOffset.SeekPosition.BEGINNING)) {
                        this.consumer.seekToBeginning(Collections.singletonList(offset.getTopicPartition()));
                        if (whereTo != null) {
                            this.consumer.seek(offset.getTopicPartition(), whereTo.longValue());
                        }
                    } else if (position.equals((Object)TopicPartitionOffset.SeekPosition.TIMESTAMP)) {
                        Map offsetsForTimes = this.consumer.offsetsForTimes(Collections.singletonMap(offset.getTopicPartition(), offset.getOffset()));
                        offsetsForTimes.forEach((tp, ot) -> this.consumer.seek(tp, ot.offset()));
                    } else {
                        this.consumer.seekToEnd(Collections.singletonList(offset.getTopicPartition()));
                        if (whereTo != null) {
                            whereTo = whereTo + this.consumer.position(offset.getTopicPartition());
                            this.consumer.seek(offset.getTopicPartition(), whereTo.longValue());
                        }
                    }
                }
                catch (Exception e) {
                    TopicPartitionOffset offsetToLog = offset;
                    this.logger.error((Throwable)e, () -> "Exception while seeking " + offsetToLog);
                }
                offset = (TopicPartitionOffset)this.seeks.poll();
            }
        }

        private void processTimestampSeeks() {
            Iterator seekIterator = this.seeks.iterator();
            HashMap<TopicPartition, Long> timestampSeeks = null;
            while (seekIterator.hasNext()) {
                TopicPartitionOffset tpo = (TopicPartitionOffset)seekIterator.next();
                if (!TopicPartitionOffset.SeekPosition.TIMESTAMP.equals((Object)tpo.getPosition())) continue;
                if (timestampSeeks == null) {
                    timestampSeeks = new HashMap<TopicPartition, Long>();
                }
                timestampSeeks.put(tpo.getTopicPartition(), tpo.getOffset());
                seekIterator.remove();
                this.traceSeek(tpo);
            }
            if (timestampSeeks != null) {
                Map offsetsForTimes = this.consumer.offsetsForTimes(timestampSeeks);
                offsetsForTimes.forEach((tp, ot) -> this.consumer.seek(tp, ot.offset()));
            }
        }

        private void traceSeek(TopicPartitionOffset offset) {
            this.logger.trace(() -> "Seek: " + offset);
        }

        private void initPartitionsIfNeeded() {
            HashMap<TopicPartition, OffsetMetadata> partitions = new HashMap<TopicPartition, OffsetMetadata>(this.definedPartitions);
            Set<TopicPartition> beginnings = partitions.entrySet().stream().filter(e -> TopicPartitionOffset.SeekPosition.BEGINNING.equals((Object)((OffsetMetadata)e.getValue()).seekPosition)).map(Map.Entry::getKey).collect(Collectors.toSet());
            beginnings.forEach(partitions::remove);
            Set<TopicPartition> ends = partitions.entrySet().stream().filter(e -> TopicPartitionOffset.SeekPosition.END.equals((Object)((OffsetMetadata)e.getValue()).seekPosition)).map(Map.Entry::getKey).collect(Collectors.toSet());
            ends.forEach(partitions::remove);
            if (beginnings.size() > 0) {
                this.consumer.seekToBeginning(beginnings);
            }
            if (ends.size() > 0) {
                this.consumer.seekToEnd(ends);
            }
            for (Map.Entry entry : partitions.entrySet()) {
                TopicPartition topicPartition = (TopicPartition)entry.getKey();
                OffsetMetadata metadata = (OffsetMetadata)entry.getValue();
                Long offset = metadata.offset;
                if (offset == null) continue;
                long newOffset = offset;
                if (offset < 0L) {
                    if (!metadata.relativeToCurrent) {
                        this.consumer.seekToEnd(Collections.singletonList(topicPartition));
                    }
                    newOffset = Math.max(0L, this.consumer.position(topicPartition) + offset);
                } else if (metadata.relativeToCurrent) {
                    newOffset = this.consumer.position(topicPartition) + offset;
                }
                try {
                    this.consumer.seek(topicPartition, newOffset);
                    this.logReset(topicPartition, newOffset);
                }
                catch (Exception e2) {
                    long newOffsetToLog = newOffset;
                    this.logger.error((Throwable)e2, () -> "Failed to set initial offset for " + topicPartition + " at " + newOffsetToLog + ". Position is " + this.consumer.position(topicPartition));
                }
            }
        }

        private void logReset(TopicPartition topicPartition, long newOffset) {
            this.logger.debug(() -> "Reset " + topicPartition + " to offset " + newOffset);
        }

        private void updatePendingOffsets() {
            ConsumerRecord record = (ConsumerRecord)this.acks.poll();
            while (record != null) {
                this.addOffset(record);
                record = (ConsumerRecord)this.acks.poll();
            }
        }

        private void addOffset(ConsumerRecord<K, V> record) {
            this.offsets.computeIfAbsent(record.topic(), v -> new ConcurrentHashMap()).compute(record.partition(), (k, v) -> v == null ? record.offset() : Math.max(v, record.offset()));
        }

        private void commitIfNecessary() {
            Map<TopicPartition, OffsetAndMetadata> commits = this.buildCommits();
            this.logger.debug(() -> "Commit list: " + commits);
            if (!commits.isEmpty()) {
                this.commitLogger.log(() -> "Committing: " + commits);
                try {
                    if (this.syncCommits) {
                        this.consumer.commitSync(commits, this.syncCommitTimeout);
                    } else {
                        this.consumer.commitAsync(commits, this.commitCallback);
                    }
                }
                catch (WakeupException e) {
                    this.logger.debug((CharSequence)"Woken up during commit");
                }
            }
        }

        private Map<TopicPartition, OffsetAndMetadata> buildCommits() {
            HashMap<TopicPartition, OffsetAndMetadata> commits = new HashMap<TopicPartition, OffsetAndMetadata>();
            for (Map.Entry<String, Map<Integer, Long>> entry : this.offsets.entrySet()) {
                for (Map.Entry<Integer, Long> offset : entry.getValue().entrySet()) {
                    commits.put(new TopicPartition(entry.getKey(), offset.getKey().intValue()), new OffsetAndMetadata(offset.getValue() + 1L));
                }
            }
            this.offsets.clear();
            return commits;
        }

        private Collection<ConsumerRecord<K, V>> getHighestOffsetRecords(ConsumerRecords<K, V> records) {
            return records.partitions().stream().collect(Collectors.toMap(tp -> tp, tp -> {
                List recordList = records.records(tp);
                return (ConsumerRecord)recordList.get(recordList.size() - 1);
            })).values();
        }

        @Override
        public void seek(String topic, int partition, long offset) {
            this.seeks.add(new TopicPartitionOffset(topic, partition, offset));
        }

        @Override
        public void seekToBeginning(String topic, int partition) {
            this.seeks.add(new TopicPartitionOffset(topic, partition, TopicPartitionOffset.SeekPosition.BEGINNING));
        }

        @Override
        public void seekToBeginning(Collection<TopicPartition> partitions) {
            this.seeks.addAll(partitions.stream().map(tp -> new TopicPartitionOffset(tp.topic(), tp.partition(), TopicPartitionOffset.SeekPosition.BEGINNING)).collect(Collectors.toList()));
        }

        @Override
        public void seekToEnd(String topic, int partition) {
            this.seeks.add(new TopicPartitionOffset(topic, partition, TopicPartitionOffset.SeekPosition.END));
        }

        @Override
        public void seekToEnd(Collection<TopicPartition> partitions) {
            this.seeks.addAll(partitions.stream().map(tp -> new TopicPartitionOffset(tp.topic(), tp.partition(), TopicPartitionOffset.SeekPosition.END)).collect(Collectors.toList()));
        }

        @Override
        public void seekRelative(String topic, int partition, long offset, boolean toCurrent) {
            if (toCurrent) {
                this.seeks.add(new TopicPartitionOffset(topic, partition, (Long)offset, toCurrent));
            } else if (offset >= 0L) {
                this.seeks.add(new TopicPartitionOffset(topic, partition, (Long)offset, TopicPartitionOffset.SeekPosition.BEGINNING));
            } else {
                this.seeks.add(new TopicPartitionOffset(topic, partition, (Long)offset, TopicPartitionOffset.SeekPosition.END));
            }
        }

        @Override
        public void seekToTimestamp(String topic, int partition, long timestamp) {
            this.seeks.add(new TopicPartitionOffset(topic, partition, (Long)timestamp, TopicPartitionOffset.SeekPosition.TIMESTAMP));
        }

        @Override
        public void seekToTimestamp(Collection<TopicPartition> topicParts, long timestamp) {
            topicParts.forEach(tp -> this.seekToTimestamp(tp.topic(), tp.partition(), timestamp));
        }

        public String toString() {
            return "KafkaMessageListenerContainer.ListenerConsumer [containerProperties=" + this.containerProperties + ", listenerType=" + (Object)((Object)this.listenerType) + ", isConsumerAwareListener=" + this.isConsumerAwareListener + ", isBatchListener=" + this.isBatchListener + ", autoCommit=" + this.autoCommit + ", consumerGroupId=" + this.consumerGroupId + ", clientIdSuffix=" + KafkaMessageListenerContainer.this.clientIdSuffix + "]";
        }

        private void closeProducers(@Nullable Collection<TopicPartition> partitions) {
            if (partitions != null) {
                ProducerFactory producerFactory = this.kafkaTxManager.getProducerFactory();
                partitions.forEach(tp -> {
                    try {
                        producerFactory.closeProducerFor(this.zombieFenceTxIdSuffix(tp.topic(), tp.partition()));
                    }
                    catch (Exception e) {
                        this.logger.error((Throwable)e, () -> "Failed to close producer with transaction id suffix: " + this.zombieFenceTxIdSuffix(tp.topic(), tp.partition()));
                    }
                });
            }
        }

        private String zombieFenceTxIdSuffix(String topic, int partition) {
            return this.consumerGroupId + "." + topic + "." + partition;
        }

        private final class InitialOrIdleSeekCallback
        implements ConsumerSeekAware.ConsumerSeekCallback {
            InitialOrIdleSeekCallback() {
            }

            @Override
            public void seek(String topic, int partition, long offset) {
                ListenerConsumer.this.consumer.seek(new TopicPartition(topic, partition), offset);
            }

            @Override
            public void seekToBeginning(String topic, int partition) {
                ListenerConsumer.this.consumer.seekToBeginning(Collections.singletonList(new TopicPartition(topic, partition)));
            }

            @Override
            public void seekToBeginning(Collection<TopicPartition> partitions) {
                ListenerConsumer.this.consumer.seekToBeginning(partitions);
            }

            @Override
            public void seekToEnd(String topic, int partition) {
                ListenerConsumer.this.consumer.seekToEnd(Collections.singletonList(new TopicPartition(topic, partition)));
            }

            @Override
            public void seekToEnd(Collection<TopicPartition> partitions) {
                ListenerConsumer.this.consumer.seekToEnd(partitions);
            }

            @Override
            public void seekRelative(String topic, int partition, long offset, boolean toCurrent) {
                TopicPartition topicPart = new TopicPartition(topic, partition);
                Long whereTo = null;
                Consumer consumerToSeek = ListenerConsumer.this.consumer;
                whereTo = offset >= 0L ? this.computeForwardWhereTo(offset, toCurrent, topicPart, consumerToSeek) : this.computeBackwardWhereTo(offset, toCurrent, topicPart, consumerToSeek);
                if (whereTo != null) {
                    consumerToSeek.seek(topicPart, whereTo.longValue());
                }
            }

            @Override
            public void seekToTimestamp(String topic, int partition, long timestamp) {
                Consumer consumerToSeek = ListenerConsumer.this.consumer;
                Map offsetsForTimes = consumerToSeek.offsetsForTimes(Collections.singletonMap(new TopicPartition(topic, partition), timestamp));
                offsetsForTimes.forEach((tp, ot) -> {
                    if (ot != null) {
                        consumerToSeek.seek(tp, ot.offset());
                    }
                });
            }

            @Override
            public void seekToTimestamp(Collection<TopicPartition> topicParts, long timestamp) {
                Consumer consumerToSeek = ListenerConsumer.this.consumer;
                Map<TopicPartition, Long> map = topicParts.stream().collect(Collectors.toMap(tp -> tp, tp -> timestamp));
                Map offsetsForTimes = consumerToSeek.offsetsForTimes(map);
                offsetsForTimes.forEach((tp, ot) -> {
                    if (ot != null) {
                        consumerToSeek.seek(tp, ot.offset());
                    }
                });
            }

            @Nullable
            private Long computeForwardWhereTo(long offset, boolean toCurrent, TopicPartition topicPart, Consumer<K, V> consumerToSeek) {
                Long start;
                if (!toCurrent) {
                    Map beginning = consumerToSeek.beginningOffsets(Collections.singletonList(topicPart));
                    start = (Long)beginning.get(topicPart);
                } else {
                    start = consumerToSeek.position(topicPart);
                }
                if (start != null) {
                    return start + offset;
                }
                return null;
            }

            @Nullable
            private Long computeBackwardWhereTo(long offset, boolean toCurrent, TopicPartition topicPart, Consumer<K, V> consumerToSeek) {
                Long end;
                if (!toCurrent) {
                    Map endings = consumerToSeek.endOffsets(Collections.singletonList(topicPart));
                    end = (Long)endings.get(topicPart);
                } else {
                    end = consumerToSeek.position(topicPart);
                }
                if (end != null) {
                    long newOffset = end + offset;
                    return newOffset < 0L ? 0L : newOffset;
                }
                return null;
            }
        }

        private class ListenerConsumerRebalanceListener
        implements ConsumerRebalanceListener {
            private final ConsumerRebalanceListener userListener;
            private final ConsumerAwareRebalanceListener consumerAwareListener;

            ListenerConsumerRebalanceListener() {
                this.userListener = KafkaMessageListenerContainer.this.getContainerProperties().getConsumerRebalanceListener();
                this.consumerAwareListener = this.userListener instanceof ConsumerAwareRebalanceListener ? (ConsumerAwareRebalanceListener)this.userListener : null;
            }

            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                try {
                    if (this.consumerAwareListener != null) {
                        this.consumerAwareListener.onPartitionsRevokedBeforeCommit(ListenerConsumer.this.consumer, partitions);
                    } else {
                        this.userListener.onPartitionsRevoked(partitions);
                    }
                    ListenerConsumer.this.commitPendingAcks();
                    if (this.consumerAwareListener != null) {
                        this.consumerAwareListener.onPartitionsRevokedAfterCommit(ListenerConsumer.this.consumer, partitions);
                    }
                    if (ListenerConsumer.this.consumerSeekAwareListener != null) {
                        ListenerConsumer.this.consumerSeekAwareListener.onPartitionsRevoked(partitions);
                    }
                    if (ListenerConsumer.this.assignedPartitions != null) {
                        ListenerConsumer.this.assignedPartitions.removeAll(partitions);
                    }
                }
                finally {
                    if (ListenerConsumer.this.kafkaTxManager != null) {
                        ListenerConsumer.this.closeProducers(partitions);
                    }
                }
            }

            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                if (ListenerConsumer.this.consumerPaused) {
                    ListenerConsumer.this.consumer.pause(partitions);
                    ListenerConsumer.this.logger.warn((CharSequence)"Paused consumer resumed by Kafka due to rebalance; consumer paused again, so the initial poll() will never return any records");
                }
                ListenerConsumer.this.assignedPartitions = new LinkedList<TopicPartition>(partitions);
                if (ListenerConsumer.this.commitCurrentOnAssignment) {
                    HashMap<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<TopicPartition, OffsetAndMetadata>();
                    for (TopicPartition partition : partitions) {
                        try {
                            offsetsToCommit.put(partition, new OffsetAndMetadata(ListenerConsumer.this.consumer.position(partition)));
                        }
                        catch (NoOffsetForPartitionException e) {
                            ListenerConsumer.this.fatalError = true;
                            ListenerConsumer.this.logger.error((Throwable)e, (CharSequence)"No offset and no reset policy");
                            return;
                        }
                    }
                    this.commitCurrentOffsets(offsetsToCommit);
                }
                if (ListenerConsumer.this.genericListener instanceof ConsumerSeekAware) {
                    ListenerConsumer.this.seekPartitions(partitions, false);
                }
                if (this.consumerAwareListener != null) {
                    this.consumerAwareListener.onPartitionsAssigned(ListenerConsumer.this.consumer, partitions);
                } else {
                    this.userListener.onPartitionsAssigned(partitions);
                }
            }

            private void commitCurrentOffsets(Map<TopicPartition, OffsetAndMetadata> offsetsToCommit) {
                ListenerConsumer.this.commitLogger.log(() -> "Committing on assignment: " + offsetsToCommit);
                if (ListenerConsumer.this.transactionTemplate != null && ListenerConsumer.this.kafkaTxManager != null && !ContainerProperties.AssignmentCommitOption.LATEST_ONLY_NO_TX.equals((Object)ListenerConsumer.this.autoCommitOption)) {
                    try {
                        offsetsToCommit.forEach((partition, offsetAndMetadata) -> {
                            TransactionSupport.setTransactionIdSuffix(ListenerConsumer.this.zombieFenceTxIdSuffix(partition.topic(), partition.partition()));
                            ListenerConsumer.this.transactionTemplate.execute((TransactionCallback)new TransactionCallbackWithoutResult((TopicPartition)partition, (OffsetAndMetadata)offsetAndMetadata){
                                final /* synthetic */ TopicPartition val$partition;
                                final /* synthetic */ OffsetAndMetadata val$offsetAndMetadata;
                                {
                                    this.val$partition = topicPartition;
                                    this.val$offsetAndMetadata = offsetAndMetadata;
                                }

                                protected void doInTransactionWithoutResult(TransactionStatus status) {
                                    KafkaResourceHolder holder = (KafkaResourceHolder)((Object)TransactionSynchronizationManager.getResource(ListenerConsumer.this.kafkaTxManager.getProducerFactory()));
                                    if (holder != null) {
                                        holder.getProducer().sendOffsetsToTransaction(Collections.singletonMap(this.val$partition, this.val$offsetAndMetadata), ListenerConsumer.this.consumerGroupId);
                                    }
                                }
                            });
                        });
                    }
                    finally {
                        TransactionSupport.clearTransactionIdSuffix();
                    }
                } else {
                    ContainerProperties containerProps = KafkaMessageListenerContainer.this.getContainerProperties();
                    if (containerProps.isSyncCommits()) {
                        ListenerConsumer.this.consumer.commitSync(offsetsToCommit, containerProps.getSyncCommitTimeout());
                    } else {
                        ListenerConsumer.this.consumer.commitAsync(offsetsToCommit, containerProps.getCommitCallback());
                    }
                }
            }

            public void onPartitionsLost(Collection<TopicPartition> partitions) {
                if (this.consumerAwareListener != null) {
                    this.consumerAwareListener.onPartitionsLost(ListenerConsumer.this.consumer, partitions);
                } else {
                    this.userListener.onPartitionsLost(partitions);
                }
                this.onPartitionsRevoked(partitions);
            }
        }

        private final class ConsumerBatchAcknowledgment
        implements Acknowledgment {
            private final ConsumerRecords<K, V> records;

            ConsumerBatchAcknowledgment(ConsumerRecords<K, V> records) {
                this.records = records;
            }

            @Override
            public void acknowledge() {
                for (ConsumerRecord record : ListenerConsumer.this.getHighestOffsetRecords(this.records)) {
                    ListenerConsumer.this.processAck(record);
                }
            }

            @Override
            public void nack(int index, long sleep) {
                Assert.state((boolean)Thread.currentThread().equals(ListenerConsumer.this.consumerThread), (String)"nack() can only be called on the consumer thread");
                Assert.isTrue((sleep >= 0L ? 1 : 0) != 0, (String)"sleep cannot be negative");
                Assert.isTrue((index >= 0 && index < this.records.count() ? 1 : 0) != 0, (String)"index out of bounds");
                ListenerConsumer.this.nackIndex = index;
                ListenerConsumer.this.nackSleep = sleep;
            }

            public String toString() {
                return "Acknowledgment for " + this.records;
            }
        }

        private final class ConsumerAcknowledgment
        implements Acknowledgment {
            private final ConsumerRecord<K, V> record;

            ConsumerAcknowledgment(ConsumerRecord<K, V> record) {
                this.record = record;
            }

            @Override
            public void acknowledge() {
                ListenerConsumer.this.processAck(this.record);
            }

            @Override
            public void nack(long sleep) {
                Assert.state((boolean)Thread.currentThread().equals(ListenerConsumer.this.consumerThread), (String)"nack() can only be called on the consumer thread");
                Assert.isTrue((sleep >= 0L ? 1 : 0) != 0, (String)"sleep cannot be negative");
                ListenerConsumer.this.nackSleep = sleep;
            }

            public String toString() {
                return "Acknowledgment for " + this.record;
            }
        }
    }
}

