/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.kafka.listener;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.kafka.listener.AcknowledgingMessageListener;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.scheduling.SchedulingAwareRunnable;
import org.springframework.util.Assert;

public class KafkaMessageListenerContainer<K, V>
extends AbstractMessageListenerContainer<K, V> {
    private final ConsumerFactory<K, V> consumerFactory;
    private final String[] topics;
    private final Pattern topicPattern;
    private final TopicPartition[] partitions;
    private ListenerConsumer listenerConsumer;
    private long recentOffset;
    private MessageListener<K, V> listener;
    private AcknowledgingMessageListener<K, V> acknowledgingMessageListener;

    public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory, TopicPartition ... topicPartitions) {
        this(consumerFactory, (String[])null, (Pattern)null, topicPartitions);
    }

    public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory, String ... topics) {
        this(consumerFactory, topics, (Pattern)null, (TopicPartition[])null);
    }

    public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory, Pattern topicPattern) {
        this(consumerFactory, null, topicPattern, null);
    }

    KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory, String[] topics, Pattern topicPattern, TopicPartition[] topicPartitions) {
        this.consumerFactory = consumerFactory;
        this.topics = topics == null ? null : Arrays.asList(topics).toArray(new String[topics.length]);
        this.topicPattern = topicPattern;
        this.partitions = topicPartitions == null ? null : new LinkedHashSet<TopicPartition>(Arrays.asList(topicPartitions)).toArray(new TopicPartition[topicPartitions.length]);
    }

    public void setRecentOffset(long recentOffset) {
        this.recentOffset = recentOffset;
    }

    public Collection<TopicPartition> getAssignedPartitions() {
        if (this.listenerConsumer.definedPartitions != null) {
            return Collections.unmodifiableCollection(this.listenerConsumer.definedPartitions);
        }
        if (this.listenerConsumer.assignedPartitions != null) {
            return Collections.unmodifiableCollection(this.listenerConsumer.assignedPartitions);
        }
        return null;
    }

    @Override
    protected void doStart() {
        if (this.isRunning()) {
            return;
        }
        this.setRunning(true);
        Object messageListener = this.getMessageListener();
        Assert.state((messageListener != null ? 1 : 0) != 0, (String)"A MessageListener is required");
        if (messageListener instanceof AcknowledgingMessageListener) {
            this.acknowledgingMessageListener = (AcknowledgingMessageListener)messageListener;
        } else if (messageListener instanceof MessageListener) {
            this.listener = (MessageListener)messageListener;
        } else {
            throw new IllegalStateException("messageListener must be 'MessageListener' or 'AcknowledgingMessageListener', not " + messageListener.getClass().getName());
        }
        if (this.getTaskExecutor() == null) {
            this.setTaskExecutor((Executor)new SimpleAsyncTaskExecutor(this.getBeanName() == null ? "kafka-" : this.getBeanName() + "-kafka-"));
        }
        this.listenerConsumer = new ListenerConsumer(this.listener, this.acknowledgingMessageListener, this.recentOffset);
        this.getTaskExecutor().execute((Runnable)((Object)this.listenerConsumer));
    }

    @Override
    protected void doStop() {
        if (this.isRunning()) {
            this.setRunning(false);
            this.listenerConsumer.consumer.wakeup();
        }
    }

    private static final class CommitCallback
    implements OffsetCommitCallback {
        private static final Log logger = LogFactory.getLog(OffsetCommitCallback.class);

        private CommitCallback() {
        }

        public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
            if (exception != null) {
                logger.error((Object)("Commit failed for " + offsets), (Throwable)exception);
            } else if (logger.isDebugEnabled()) {
                logger.debug((Object)("Commits for " + offsets + " completed"));
            }
        }
    }

    private class ListenerConsumer
    implements SchedulingAwareRunnable {
        private final Log logger = LogFactory.getLog(ListenerConsumer.class);
        private final CommitCallback callback = new CommitCallback();
        private final Consumer<K, V> consumer;
        private final ConcurrentMap<String, ConcurrentMap<Integer, Long>> manualOffsets = new ConcurrentHashMap<String, ConcurrentMap<Integer, Long>>();
        private final Map<String, Map<Integer, Long>> offsets = new HashMap<String, Map<Integer, Long>>();
        private final MessageListener<K, V> listener;
        private final AcknowledgingMessageListener<K, V> acknowledgingMessageListener;
        private final long recentOffset;
        private final boolean autoCommit = KafkaMessageListenerContainer.access$400(KafkaMessageListenerContainer.this).isAutoCommit();
        private Thread consumerThread;
        private volatile Collection<TopicPartition> definedPartitions;
        private volatile Collection<TopicPartition> assignedPartitions;
        private int count;
        private long last;

        ListenerConsumer(MessageListener<K, V> listener, AcknowledgingMessageListener<K, V> ackListener, long recentOffset) {
            Assert.state((!KafkaMessageListenerContainer.this.getAckMode().equals((Object)AbstractMessageListenerContainer.AckMode.MANUAL) && !KafkaMessageListenerContainer.this.getAckMode().equals((Object)AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE) || !this.autoCommit ? 1 : 0) != 0, (String)("Consumer cannot be configured for auto commit for ackMode " + (Object)((Object)KafkaMessageListenerContainer.this.getAckMode())));
            Consumer consumer = KafkaMessageListenerContainer.this.consumerFactory.createConsumer();
            ConsumerRebalanceListener rebalanceListener = new ConsumerRebalanceListener(){

                public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                    KafkaMessageListenerContainer.this.logger.info((Object)("partitions revoked:" + partitions));
                }

                public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                    ListenerConsumer.this.assignedPartitions = partitions;
                    KafkaMessageListenerContainer.this.logger.info((Object)("partitions assigned:" + partitions));
                }
            };
            if (KafkaMessageListenerContainer.this.partitions == null) {
                if (KafkaMessageListenerContainer.this.topicPattern != null) {
                    consumer.subscribe(KafkaMessageListenerContainer.this.topicPattern, rebalanceListener);
                } else {
                    consumer.subscribe(Arrays.asList(KafkaMessageListenerContainer.this.topics), rebalanceListener);
                }
            } else {
                List<TopicPartition> topicPartitions = Arrays.asList(KafkaMessageListenerContainer.this.partitions);
                this.definedPartitions = topicPartitions;
                consumer.assign(topicPartitions);
            }
            this.consumer = consumer;
            this.listener = listener;
            this.acknowledgingMessageListener = ackListener;
            this.recentOffset = recentOffset;
        }

        public boolean isLongLived() {
            return true;
        }

        public void run() {
            this.consumerThread = Thread.currentThread();
            this.count = 0;
            this.last = System.currentTimeMillis();
            if (KafkaMessageListenerContainer.this.isRunning() && this.definedPartitions != null) {
                this.initPartitionsIfNeeded();
            }
            AbstractMessageListenerContainer.AckMode ackMode = KafkaMessageListenerContainer.this.getAckMode();
            while (KafkaMessageListenerContainer.this.isRunning()) {
                try {
                    ConsumerRecords records;
                    if (this.logger.isTraceEnabled()) {
                        this.logger.trace((Object)"Polling...");
                    }
                    if ((records = this.consumer.poll(KafkaMessageListenerContainer.this.getPollTimeout())) != null) {
                        if (this.logger.isDebugEnabled()) {
                            this.logger.debug((Object)("Received: " + records.count() + " records"));
                        }
                        for (ConsumerRecord record : records) {
                            this.invokeListener(record);
                            if (this.autoCommit || !ackMode.equals((Object)AbstractMessageListenerContainer.AckMode.RECORD)) continue;
                            this.consumer.commitAsync(Collections.singletonMap(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1L)), (OffsetCommitCallback)this.callback);
                        }
                        if (this.autoCommit) continue;
                        this.processCommits(ackMode, records);
                        continue;
                    }
                    if (!this.logger.isDebugEnabled()) continue;
                    this.logger.debug((Object)"No records");
                }
                catch (WakeupException records) {
                }
                catch (Exception e) {
                    if (KafkaMessageListenerContainer.this.getErrorHandler() != null) {
                        KafkaMessageListenerContainer.this.getErrorHandler().handle(e, null);
                        continue;
                    }
                    this.logger.error((Object)"Container exception", (Throwable)e);
                }
            }
            if (this.offsets.size() > 0) {
                this.commitIfNecessary();
            }
            try {
                this.consumer.unsubscribe();
            }
            catch (WakeupException wakeupException) {
                // empty catch block
            }
            this.consumer.close();
            if (this.logger.isInfoEnabled()) {
                this.logger.info((Object)"Consumer stopped");
            }
        }

        private void invokeListener(final ConsumerRecord<K, V> record) {
            try {
                if (this.acknowledgingMessageListener != null) {
                    this.acknowledgingMessageListener.onMessage(record, new Acknowledgment(){

                        /*
                         * Enabled force condition propagation
                         * Lifted jumps to return sites
                         */
                        @Override
                        public void acknowledge() {
                            if (KafkaMessageListenerContainer.this.getAckMode().equals((Object)AbstractMessageListenerContainer.AckMode.MANUAL)) {
                                ListenerConsumer.this.updateManualOffset(record);
                                return;
                            } else {
                                if (!KafkaMessageListenerContainer.this.getAckMode().equals((Object)AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE)) throw new IllegalStateException("AckMode must be MANUAL or MANUAL_IMMEDIATE for manual acks");
                                if (!Thread.currentThread().equals(ListenerConsumer.this.consumerThread)) throw new IllegalStateException("With MANUAL_IMMEDIATE ack mode, acknowledge() must be invoked on the consumer thread");
                                Map<TopicPartition, OffsetAndMetadata> commits = Collections.singletonMap(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1L));
                                if (ListenerConsumer.this.logger.isDebugEnabled()) {
                                    ListenerConsumer.this.logger.debug((Object)("Committing: " + commits));
                                }
                                ListenerConsumer.this.consumer.commitAsync(commits, (OffsetCommitCallback)ListenerConsumer.this.callback);
                            }
                        }

                        public String toString() {
                            return "Acknowledgment for " + record;
                        }
                    });
                } else {
                    this.listener.onMessage(record);
                }
            }
            catch (Exception e) {
                if (KafkaMessageListenerContainer.this.getErrorHandler() != null) {
                    KafkaMessageListenerContainer.this.getErrorHandler().handle(e, record);
                }
                this.logger.error((Object)("Listener threw an exception and no error handler for " + record), (Throwable)e);
            }
        }

        private void processCommits(AbstractMessageListenerContainer.AckMode ackMode, ConsumerRecords<K, V> records) {
            this.count += records.count();
            if (ackMode.equals((Object)AbstractMessageListenerContainer.AckMode.BATCH)) {
                if (!records.isEmpty()) {
                    this.consumer.commitAsync((OffsetCommitCallback)this.callback);
                }
            } else if (!ackMode.equals((Object)AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE)) {
                boolean countExceeded;
                if (!ackMode.equals((Object)AbstractMessageListenerContainer.AckMode.MANUAL)) {
                    this.updatePendingOffsets(records);
                }
                boolean bl = countExceeded = this.count >= KafkaMessageListenerContainer.this.getAckCount();
                if (ackMode.equals((Object)AbstractMessageListenerContainer.AckMode.COUNT) && countExceeded) {
                    this.commitIfNecessary();
                    this.count = 0;
                } else {
                    boolean elapsed;
                    long now = System.currentTimeMillis();
                    boolean bl2 = elapsed = now - this.last > KafkaMessageListenerContainer.this.getAckTime();
                    if (ackMode.equals((Object)AbstractMessageListenerContainer.AckMode.TIME) && elapsed) {
                        this.commitIfNecessary();
                        this.last = now;
                    } else if ((ackMode.equals((Object)AbstractMessageListenerContainer.AckMode.COUNT_TIME) || ackMode.equals((Object)AbstractMessageListenerContainer.AckMode.MANUAL)) && (elapsed || countExceeded)) {
                        this.commitIfNecessary();
                        this.last = now;
                        this.count = 0;
                    }
                }
            }
        }

        private void initPartitionsIfNeeded() {
            if (this.recentOffset > 0L) {
                this.consumer.seekToEnd(this.definedPartitions.toArray(new TopicPartition[this.definedPartitions.size()]));
                for (TopicPartition topicPartition : this.definedPartitions) {
                    long newOffset = this.consumer.position(topicPartition) - this.recentOffset;
                    this.consumer.seek(topicPartition, newOffset);
                    if (!this.logger.isDebugEnabled()) continue;
                    this.logger.debug((Object)("Reset " + topicPartition + " to offset " + newOffset));
                }
            }
        }

        private void updatePendingOffsets(ConsumerRecords<K, V> records) {
            for (ConsumerRecord record : records) {
                if (!this.offsets.containsKey(record.topic())) {
                    this.offsets.put(record.topic(), new HashMap());
                }
                this.offsets.get(record.topic()).put(record.partition(), record.offset());
            }
        }

        private void updateManualOffset(ConsumerRecord<K, V> record) {
            if (!this.manualOffsets.containsKey(record.topic())) {
                this.manualOffsets.putIfAbsent(record.topic(), new ConcurrentHashMap());
            }
            ((ConcurrentMap)this.manualOffsets.get(record.topic())).put(record.partition(), record.offset());
        }

        private void commitIfNecessary() {
            HashMap<TopicPartition, OffsetAndMetadata> commits = new HashMap<TopicPartition, OffsetAndMetadata>();
            if (AbstractMessageListenerContainer.AckMode.MANUAL.equals((Object)KafkaMessageListenerContainer.this.getAckMode())) {
                for (Map.Entry entry : this.manualOffsets.entrySet()) {
                    Iterator iterator = ((ConcurrentMap)entry.getValue()).entrySet().iterator();
                    while (iterator.hasNext()) {
                        Map.Entry offset = iterator.next();
                        commits.put(new TopicPartition((String)entry.getKey(), ((Integer)offset.getKey()).intValue()), new OffsetAndMetadata((Long)offset.getValue() + 1L));
                        iterator.remove();
                    }
                }
            } else {
                for (Map.Entry<String, Map<Integer, Long>> entry : this.offsets.entrySet()) {
                    for (Map.Entry<Integer, Long> offset : entry.getValue().entrySet()) {
                        commits.put(new TopicPartition(entry.getKey(), offset.getKey().intValue()), new OffsetAndMetadata(offset.getValue() + 1L));
                    }
                }
            }
            this.offsets.clear();
            if (this.logger.isDebugEnabled()) {
                this.logger.debug((Object)("Committing: " + commits));
            }
            if (!commits.isEmpty()) {
                this.consumer.commitAsync(commits, (OffsetCommitCallback)this.callback);
            }
        }
    }
}

