package org.springframework.kafka.listener;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.scheduling.SchedulingAwareRunnable;
import org.springframework.util.Assert;

/* loaded from: input_file:org/springframework/kafka/listener/KafkaMessageListenerContainer.class */
public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListenerContainer<K, V> {
    private final ConsumerFactory<K, V> consumerFactory;
    private final String[] topics;
    private final Pattern topicPattern;
    private final TopicPartition[] partitions;
    private KafkaMessageListenerContainer<K, V>.ListenerConsumer listenerConsumer;
    private long recentOffset;
    private MessageListener<K, V> listener;
    private AcknowledgingMessageListener<K, V> acknowledgingMessageListener;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/springframework/kafka/listener/KafkaMessageListenerContainer$CommitCallback.class */
    public static final class CommitCallback implements OffsetCommitCallback {
        private static final Log logger = LogFactory.getLog(OffsetCommitCallback.class);

        private CommitCallback() {
        }

        public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception exc) {
            if (exc != null) {
                logger.error("Commit failed for " + map, exc);
            } else if (logger.isDebugEnabled()) {
                logger.debug("Commits for " + map + " completed");
            }
        }
    }

    /* loaded from: input_file:org/springframework/kafka/listener/KafkaMessageListenerContainer$ListenerConsumer.class */
    private class ListenerConsumer implements SchedulingAwareRunnable {
        private final Consumer<K, V> consumer;
        private final MessageListener<K, V> listener;
        private final AcknowledgingMessageListener<K, V> acknowledgingMessageListener;
        private final long recentOffset;
        private final boolean autoCommit;
        private Thread consumerThread;
        private volatile Collection<TopicPartition> definedPartitions;
        private volatile Collection<TopicPartition> assignedPartitions;
        private int count;
        private long last;
        private final Log logger = LogFactory.getLog(ListenerConsumer.class);
        private final CommitCallback callback = new CommitCallback();
        private final ConcurrentMap<String, ConcurrentMap<Integer, Long>> manualOffsets = new ConcurrentHashMap();
        private final Map<String, Map<Integer, Long>> offsets = new HashMap();

        ListenerConsumer(MessageListener<K, V> messageListener, AcknowledgingMessageListener<K, V> acknowledgingMessageListener, long j) {
            this.autoCommit = KafkaMessageListenerContainer.this.consumerFactory.isAutoCommit();
            Assert.state(((KafkaMessageListenerContainer.this.getAckMode().equals(AbstractMessageListenerContainer.AckMode.MANUAL) || KafkaMessageListenerContainer.this.getAckMode().equals(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE)) && this.autoCommit) ? false : true, "Consumer cannot be configured for auto commit for ackMode " + KafkaMessageListenerContainer.this.getAckMode());
            Consumer<K, V> createConsumer = KafkaMessageListenerContainer.this.consumerFactory.createConsumer();
            ConsumerRebalanceListener consumerRebalanceListener = new ConsumerRebalanceListener() { // from class: org.springframework.kafka.listener.KafkaMessageListenerContainer.ListenerConsumer.1
                public void onPartitionsRevoked(Collection<TopicPartition> collection) {
                    KafkaMessageListenerContainer.this.logger.info("partitions revoked:" + collection);
                }

                public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                    ListenerConsumer.this.assignedPartitions = collection;
                    KafkaMessageListenerContainer.this.logger.info("partitions assigned:" + collection);
                }
            };
            if (KafkaMessageListenerContainer.this.partitions != null) {
                List asList = Arrays.asList(KafkaMessageListenerContainer.this.partitions);
                this.definedPartitions = asList;
                createConsumer.assign(asList);
            } else if (KafkaMessageListenerContainer.this.topicPattern != null) {
                createConsumer.subscribe(KafkaMessageListenerContainer.this.topicPattern, consumerRebalanceListener);
            } else {
                createConsumer.subscribe(Arrays.asList(KafkaMessageListenerContainer.this.topics), consumerRebalanceListener);
            }
            this.consumer = createConsumer;
            this.listener = messageListener;
            this.acknowledgingMessageListener = acknowledgingMessageListener;
            this.recentOffset = j;
        }

        public boolean isLongLived() {
            return true;
        }

        public void run() {
            this.consumerThread = Thread.currentThread();
            this.count = 0;
            this.last = System.currentTimeMillis();
            if (KafkaMessageListenerContainer.this.isRunning() && this.definedPartitions != null) {
                initPartitionsIfNeeded();
            }
            AbstractMessageListenerContainer.AckMode ackMode = KafkaMessageListenerContainer.this.getAckMode();
            while (KafkaMessageListenerContainer.this.isRunning()) {
                try {
                    if (this.logger.isTraceEnabled()) {
                        this.logger.trace("Polling...");
                    }
                    ConsumerRecords<K, V> poll = this.consumer.poll(KafkaMessageListenerContainer.this.getPollTimeout());
                    if (poll != null) {
                        if (this.logger.isDebugEnabled()) {
                            this.logger.debug("Received: " + poll.count() + " records");
                        }
                        Iterator it = poll.iterator();
                        while (it.hasNext()) {
                            ConsumerRecord<K, V> consumerRecord = (ConsumerRecord) it.next();
                            invokeListener(consumerRecord);
                            if (!this.autoCommit && ackMode.equals(AbstractMessageListenerContainer.AckMode.RECORD)) {
                                this.consumer.commitAsync(Collections.singletonMap(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), new OffsetAndMetadata(consumerRecord.offset() + 1)), this.callback);
                            }
                        }
                        if (!this.autoCommit) {
                            processCommits(ackMode, poll);
                        }
                    } else if (this.logger.isDebugEnabled()) {
                        this.logger.debug("No records");
                    }
                } catch (WakeupException e) {
                } catch (Exception e2) {
                    if (KafkaMessageListenerContainer.this.getErrorHandler() != null) {
                        KafkaMessageListenerContainer.this.getErrorHandler().handle(e2, null);
                    } else {
                        this.logger.error("Container exception", e2);
                    }
                }
            }
            if (this.offsets.size() > 0) {
                commitIfNecessary();
            }
            try {
                this.consumer.unsubscribe();
            } catch (WakeupException e3) {
            }
            this.consumer.close();
            if (this.logger.isInfoEnabled()) {
                this.logger.info("Consumer stopped");
            }
        }

        private void invokeListener(final ConsumerRecord<K, V> consumerRecord) {
            try {
                if (this.acknowledgingMessageListener != null) {
                    this.acknowledgingMessageListener.onMessage(consumerRecord, new Acknowledgment() { // from class: org.springframework.kafka.listener.KafkaMessageListenerContainer.ListenerConsumer.2
                        @Override // org.springframework.kafka.support.Acknowledgment
                        public void acknowledge() {
                            if (KafkaMessageListenerContainer.this.getAckMode().equals(AbstractMessageListenerContainer.AckMode.MANUAL)) {
                                ListenerConsumer.this.updateManualOffset(consumerRecord);
                                return;
                            }
                            if (!KafkaMessageListenerContainer.this.getAckMode().equals(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE)) {
                                throw new IllegalStateException("AckMode must be MANUAL or MANUAL_IMMEDIATE for manual acks");
                            }
                            if (!Thread.currentThread().equals(ListenerConsumer.this.consumerThread)) {
                                throw new IllegalStateException("With MANUAL_IMMEDIATE ack mode, acknowledge() must be invoked on the consumer thread");
                            }
                            Map singletonMap = Collections.singletonMap(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), new OffsetAndMetadata(consumerRecord.offset() + 1));
                            if (ListenerConsumer.this.logger.isDebugEnabled()) {
                                ListenerConsumer.this.logger.debug("Committing: " + singletonMap);
                            }
                            ListenerConsumer.this.consumer.commitAsync(singletonMap, ListenerConsumer.this.callback);
                        }

                        public String toString() {
                            return "Acknowledgment for " + consumerRecord;
                        }
                    });
                } else {
                    this.listener.onMessage(consumerRecord);
                }
            } catch (Exception e) {
                if (KafkaMessageListenerContainer.this.getErrorHandler() != null) {
                    KafkaMessageListenerContainer.this.getErrorHandler().handle(e, consumerRecord);
                } else {
                    this.logger.error("Listener threw an exception and no error handler for " + consumerRecord, e);
                }
            }
        }

        private void processCommits(AbstractMessageListenerContainer.AckMode ackMode, ConsumerRecords<K, V> consumerRecords) {
            this.count += consumerRecords.count();
            if (ackMode.equals(AbstractMessageListenerContainer.AckMode.BATCH)) {
                if (consumerRecords.isEmpty()) {
                    return;
                }
                this.consumer.commitAsync(this.callback);
                return;
            }
            if (ackMode.equals(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE)) {
                return;
            }
            if (!ackMode.equals(AbstractMessageListenerContainer.AckMode.MANUAL)) {
                updatePendingOffsets(consumerRecords);
            }
            boolean z = this.count >= KafkaMessageListenerContainer.this.getAckCount();
            if (ackMode.equals(AbstractMessageListenerContainer.AckMode.COUNT) && z) {
                commitIfNecessary();
                this.count = 0;
                return;
            }
            long currentTimeMillis = System.currentTimeMillis();
            boolean z2 = currentTimeMillis - this.last > KafkaMessageListenerContainer.this.getAckTime();
            if (ackMode.equals(AbstractMessageListenerContainer.AckMode.TIME) && z2) {
                commitIfNecessary();
                this.last = currentTimeMillis;
            } else if (ackMode.equals(AbstractMessageListenerContainer.AckMode.COUNT_TIME) || ackMode.equals(AbstractMessageListenerContainer.AckMode.MANUAL)) {
                if (z2 || z) {
                    commitIfNecessary();
                    this.last = currentTimeMillis;
                    this.count = 0;
                }
            }
        }

        private void initPartitionsIfNeeded() {
            if (this.recentOffset > 0) {
                this.consumer.seekToEnd((TopicPartition[]) this.definedPartitions.toArray(new TopicPartition[this.definedPartitions.size()]));
                for (TopicPartition topicPartition : this.definedPartitions) {
                    long position = this.consumer.position(topicPartition) - this.recentOffset;
                    this.consumer.seek(topicPartition, position);
                    if (this.logger.isDebugEnabled()) {
                        this.logger.debug("Reset " + topicPartition + " to offset " + position);
                    }
                }
            }
        }

        private void updatePendingOffsets(ConsumerRecords<K, V> consumerRecords) {
            Iterator it = consumerRecords.iterator();
            while (it.hasNext()) {
                ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                if (!this.offsets.containsKey(consumerRecord.topic())) {
                    this.offsets.put(consumerRecord.topic(), new HashMap());
                }
                this.offsets.get(consumerRecord.topic()).put(Integer.valueOf(consumerRecord.partition()), Long.valueOf(consumerRecord.offset()));
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void updateManualOffset(ConsumerRecord<K, V> consumerRecord) {
            if (!this.manualOffsets.containsKey(consumerRecord.topic())) {
                this.manualOffsets.putIfAbsent(consumerRecord.topic(), new ConcurrentHashMap());
            }
            this.manualOffsets.get(consumerRecord.topic()).put(Integer.valueOf(consumerRecord.partition()), Long.valueOf(consumerRecord.offset()));
        }

        private void commitIfNecessary() {
            HashMap hashMap = new HashMap();
            if (AbstractMessageListenerContainer.AckMode.MANUAL.equals(KafkaMessageListenerContainer.this.getAckMode())) {
                for (Map.Entry<String, ConcurrentMap<Integer, Long>> entry : this.manualOffsets.entrySet()) {
                    Iterator<Map.Entry<Integer, Long>> it = entry.getValue().entrySet().iterator();
                    while (it.hasNext()) {
                        Map.Entry<Integer, Long> next = it.next();
                        hashMap.put(new TopicPartition(entry.getKey(), next.getKey().intValue()), new OffsetAndMetadata(next.getValue().longValue() + 1));
                        it.remove();
                    }
                }
            } else {
                for (Map.Entry<String, Map<Integer, Long>> entry2 : this.offsets.entrySet()) {
                    for (Map.Entry<Integer, Long> entry3 : entry2.getValue().entrySet()) {
                        hashMap.put(new TopicPartition(entry2.getKey(), entry3.getKey().intValue()), new OffsetAndMetadata(entry3.getValue().longValue() + 1));
                    }
                }
            }
            this.offsets.clear();
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("Committing: " + hashMap);
            }
            if (hashMap.isEmpty()) {
                return;
            }
            this.consumer.commitAsync(hashMap, this.callback);
        }
    }

    public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory, TopicPartition... topicPartitionArr) {
        this(consumerFactory, null, null, topicPartitionArr);
    }

    public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory, String... strArr) {
        this(consumerFactory, strArr, null, null);
    }

    public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory, Pattern pattern) {
        this(consumerFactory, null, pattern, null);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory, String[] strArr, Pattern pattern, TopicPartition[] topicPartitionArr) {
        this.consumerFactory = consumerFactory;
        this.topics = strArr == null ? null : (String[]) Arrays.asList(strArr).toArray(new String[strArr.length]);
        this.topicPattern = pattern;
        this.partitions = topicPartitionArr == null ? null : (TopicPartition[]) new LinkedHashSet(Arrays.asList(topicPartitionArr)).toArray(new TopicPartition[topicPartitionArr.length]);
    }

    public void setRecentOffset(long j) {
        this.recentOffset = j;
    }

    public Collection<TopicPartition> getAssignedPartitions() {
        if (((ListenerConsumer) this.listenerConsumer).definedPartitions != null) {
            return Collections.unmodifiableCollection(((ListenerConsumer) this.listenerConsumer).definedPartitions);
        }
        if (((ListenerConsumer) this.listenerConsumer).assignedPartitions != null) {
            return Collections.unmodifiableCollection(((ListenerConsumer) this.listenerConsumer).assignedPartitions);
        }
        return null;
    }

    @Override // org.springframework.kafka.listener.AbstractMessageListenerContainer
    protected void doStart() {
        if (isRunning()) {
            return;
        }
        setRunning(true);
        Object messageListener = getMessageListener();
        Assert.state(messageListener != null, "A MessageListener is required");
        if (messageListener instanceof AcknowledgingMessageListener) {
            this.acknowledgingMessageListener = (AcknowledgingMessageListener) messageListener;
        } else {
            if (!(messageListener instanceof MessageListener)) {
                throw new IllegalStateException("messageListener must be 'MessageListener' or 'AcknowledgingMessageListener', not " + messageListener.getClass().getName());
            }
            this.listener = (MessageListener) messageListener;
        }
        if (getTaskExecutor() == null) {
            setTaskExecutor(new SimpleAsyncTaskExecutor(getBeanName() == null ? "kafka-" : getBeanName() + "-kafka-"));
        }
        this.listenerConsumer = new ListenerConsumer(this.listener, this.acknowledgingMessageListener, this.recentOffset);
        getTaskExecutor().execute(this.listenerConsumer);
    }

    @Override // org.springframework.kafka.listener.AbstractMessageListenerContainer
    protected void doStop() {
        if (isRunning()) {
            setRunning(false);
            ((ListenerConsumer) this.listenerConsumer).consumer.wakeup();
        }
    }
}
