package org.apache.storm.kafka.spout;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;
import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
import org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault;
import org.apache.storm.kafka.spout.internal.OffsetManager;
import org.apache.storm.kafka.spout.internal.Timer;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/storm/kafka/spout/KafkaSpout.class */
public class KafkaSpout<K, V> extends BaseRichSpout {
    private static final long serialVersionUID = 4151921085047987154L;
    public static final long TIMER_DELAY_MS = 500;
    private static final Logger LOG = LoggerFactory.getLogger(KafkaSpout.class);
    protected SpoutOutputCollector collector;
    private final KafkaSpoutConfig<K, V> kafkaSpoutConfig;
    private KafkaConsumerFactory kafkaConsumerFactory;
    private transient KafkaConsumer<K, V> kafkaConsumer;
    private transient boolean consumerAutoCommitMode;
    private transient KafkaSpoutConfig.FirstPollOffsetStrategy firstPollOffsetStrategy;
    private transient KafkaSpoutRetryService retryService;
    private transient Timer commitTimer;
    private transient boolean initialized;
    private transient Map<TopicPartition, OffsetManager> acked;
    private transient Set<KafkaSpoutMessageId> emitted;
    private transient Iterator<ConsumerRecord<K, V>> waitingToEmit;
    private transient long numUncommittedOffsets;
    private transient Timer refreshSubscriptionTimer;
    private transient TopologyContext context;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/storm/kafka/spout/KafkaSpout$KafkaSpoutConsumerRebalanceListener.class */
    public class KafkaSpoutConsumerRebalanceListener implements ConsumerRebalanceListener {
        private KafkaSpoutConsumerRebalanceListener() {
        }

        public void onPartitionsRevoked(Collection<TopicPartition> collection) {
            KafkaSpout.LOG.info("Partitions revoked. [consumer-group={}, consumer={}, topic-partitions={}]", new Object[]{KafkaSpout.this.kafkaSpoutConfig.getConsumerGroupId(), KafkaSpout.this.kafkaConsumer, collection});
            if (KafkaSpout.this.consumerAutoCommitMode || !KafkaSpout.this.initialized) {
                return;
            }
            KafkaSpout.this.initialized = false;
            KafkaSpout.this.commitOffsetsForAckedTuples();
        }

        public void onPartitionsAssigned(Collection<TopicPartition> collection) {
            KafkaSpout.LOG.info("Partitions reassignment. [consumer-group={}, consumer={}, topic-partitions={}]", new Object[]{KafkaSpout.this.kafkaSpoutConfig.getConsumerGroupId(), KafkaSpout.this.kafkaConsumer, collection});
            initialize(collection);
        }

        private void initialize(Collection<TopicPartition> collection) {
            if (!KafkaSpout.this.consumerAutoCommitMode) {
                KafkaSpout.this.acked.keySet().retainAll(collection);
            }
            KafkaSpout.this.retryService.retainAll(collection);
            HashSet hashSet = new HashSet(collection);
            Iterator it = KafkaSpout.this.emitted.iterator();
            while (it.hasNext()) {
                if (!hashSet.contains(((KafkaSpoutMessageId) it.next()).getTopicPartition())) {
                    it.remove();
                }
            }
            for (TopicPartition topicPartition : collection) {
                KafkaSpout.this.setAcked(topicPartition, doSeek(topicPartition, KafkaSpout.this.kafkaConsumer.committed(topicPartition)));
            }
            KafkaSpout.this.initialized = true;
            KafkaSpout.LOG.info("Initialization complete");
        }

        private long doSeek(TopicPartition topicPartition, OffsetAndMetadata offsetAndMetadata) {
            long position;
            if (offsetAndMetadata == null) {
                if (KafkaSpout.this.firstPollOffsetStrategy.equals(KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST) || KafkaSpout.this.firstPollOffsetStrategy.equals(KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST)) {
                    KafkaSpout.this.kafkaConsumer.seekToBeginning(Collections.singleton(topicPartition));
                } else if (KafkaSpout.this.firstPollOffsetStrategy.equals(KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST) || KafkaSpout.this.firstPollOffsetStrategy.equals(KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_LATEST)) {
                    KafkaSpout.this.kafkaConsumer.seekToEnd(Collections.singleton(topicPartition));
                }
                position = KafkaSpout.this.kafkaConsumer.position(topicPartition);
            } else if (KafkaSpout.this.firstPollOffsetStrategy.equals(KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST)) {
                KafkaSpout.this.kafkaConsumer.seekToBeginning(Collections.singleton(topicPartition));
                position = KafkaSpout.this.kafkaConsumer.position(topicPartition);
            } else if (KafkaSpout.this.firstPollOffsetStrategy.equals(KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST)) {
                KafkaSpout.this.kafkaConsumer.seekToEnd(Collections.singleton(topicPartition));
                position = KafkaSpout.this.kafkaConsumer.position(topicPartition);
            } else {
                position = offsetAndMetadata.offset() + 1;
                KafkaSpout.this.kafkaConsumer.seek(topicPartition, position);
            }
            return position;
        }
    }

    public KafkaSpout(KafkaSpoutConfig<K, V> kafkaSpoutConfig) {
        this(kafkaSpoutConfig, new KafkaConsumerFactoryDefault());
    }

    KafkaSpout(KafkaSpoutConfig<K, V> kafkaSpoutConfig, KafkaConsumerFactory<K, V> kafkaConsumerFactory) {
        this.kafkaConsumerFactory = kafkaConsumerFactory;
        this.kafkaSpoutConfig = kafkaSpoutConfig;
    }

    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        this.initialized = false;
        this.context = topologyContext;
        this.collector = spoutOutputCollector;
        this.numUncommittedOffsets = 0L;
        this.firstPollOffsetStrategy = this.kafkaSpoutConfig.getFirstPollOffsetStrategy();
        this.consumerAutoCommitMode = this.kafkaSpoutConfig.isConsumerAutoCommitMode();
        this.retryService = this.kafkaSpoutConfig.getRetryService();
        if (!this.consumerAutoCommitMode) {
            this.commitTimer = new Timer(500L, this.kafkaSpoutConfig.getOffsetsCommitPeriodMs(), TimeUnit.MILLISECONDS);
        }
        this.refreshSubscriptionTimer = new Timer(500L, this.kafkaSpoutConfig.getPartitionRefreshPeriodMs(), TimeUnit.MILLISECONDS);
        this.acked = new HashMap();
        this.emitted = new HashSet();
        this.waitingToEmit = Collections.emptyListIterator();
        LOG.info("Kafka Spout opened with the following configuration: {}", this.kafkaSpoutConfig);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void setAcked(TopicPartition topicPartition, long j) {
        if (this.consumerAutoCommitMode || this.acked.containsKey(topicPartition)) {
            return;
        }
        this.acked.put(topicPartition, new OffsetManager(topicPartition, j));
    }

    public void nextTuple() {
        try {
            if (this.initialized) {
                if (commit()) {
                    commitOffsetsForAckedTuples();
                }
                if (poll()) {
                    try {
                        setWaitingToEmit(pollKafkaBroker());
                    } catch (RetriableException e) {
                        LOG.error("Failed to poll from kafka.", e);
                    }
                }
                if (waitingToEmit()) {
                    emit();
                }
            } else {
                LOG.debug("Spout not initialized. Not sending tuples until initialization completes");
            }
        } catch (InterruptException e2) {
            throwKafkaConsumerInterruptedException();
        }
    }

    private void throwKafkaConsumerInterruptedException() {
        throw new RuntimeException(new InterruptedException("Kafka consumer was interrupted"));
    }

    private boolean commit() {
        return !this.consumerAutoCommitMode && this.commitTimer.isExpiredResetOnTrue();
    }

    private boolean poll() {
        int maxUncommittedOffsets = this.kafkaSpoutConfig.getMaxUncommittedOffsets();
        boolean z = !waitingToEmit() && (this.numUncommittedOffsets < ((long) maxUncommittedOffsets) || this.consumerAutoCommitMode);
        if (!z) {
            if (waitingToEmit()) {
                LOG.debug("Not polling. Tuples waiting to be emitted. [{}] uncommitted offsets across all topic partitions", Long.valueOf(this.numUncommittedOffsets));
            }
            if (this.numUncommittedOffsets >= maxUncommittedOffsets && !this.consumerAutoCommitMode) {
                LOG.debug("Not polling. [{}] uncommitted offsets across all topic partitions has reached the threshold of [{}]", Long.valueOf(this.numUncommittedOffsets), Integer.valueOf(maxUncommittedOffsets));
            }
        }
        return z;
    }

    private boolean waitingToEmit() {
        return this.waitingToEmit != null && this.waitingToEmit.hasNext();
    }

    public void setWaitingToEmit(ConsumerRecords<K, V> consumerRecords) {
        LinkedList linkedList = new LinkedList();
        Iterator it = consumerRecords.partitions().iterator();
        while (it.hasNext()) {
            linkedList.addAll(consumerRecords.records((TopicPartition) it.next()));
        }
        this.waitingToEmit = linkedList.iterator();
    }

    private ConsumerRecords<K, V> pollKafkaBroker() {
        doSeekRetriableTopicPartitions();
        if (this.refreshSubscriptionTimer.isExpiredResetOnTrue()) {
            this.kafkaSpoutConfig.getSubscription().refreshAssignment();
        }
        ConsumerRecords<K, V> poll = this.kafkaConsumer.poll(this.kafkaSpoutConfig.getPollTimeoutMs());
        LOG.debug("Polled [{}] records from Kafka. [{}] uncommitted offsets across all topic partitions", Integer.valueOf(poll.count()), Long.valueOf(this.numUncommittedOffsets));
        return poll;
    }

    private void doSeekRetriableTopicPartitions() {
        for (TopicPartition topicPartition : this.retryService.retriableTopicPartitions()) {
            OffsetAndMetadata findNextCommitOffset = this.acked.get(topicPartition).findNextCommitOffset();
            if (findNextCommitOffset != null) {
                this.kafkaConsumer.seek(topicPartition, findNextCommitOffset.offset() + 1);
            } else {
                this.kafkaConsumer.seek(topicPartition, this.acked.get(topicPartition).getCommittedOffset() + 1);
            }
        }
    }

    private void emit() {
        while (!emitTupleIfNotEmitted(this.waitingToEmit.next()) && this.waitingToEmit.hasNext()) {
            this.waitingToEmit.remove();
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private boolean emitTupleIfNotEmitted(ConsumerRecord<K, V> consumerRecord) {
        TopicPartition topicPartition = new TopicPartition(consumerRecord.topic(), consumerRecord.partition());
        KafkaSpoutMessageId kafkaSpoutMessageId = new KafkaSpoutMessageId(consumerRecord);
        if (this.acked.containsKey(topicPartition) && this.acked.get(topicPartition).contains(kafkaSpoutMessageId)) {
            LOG.trace("Tuple for record [{}] has already been acked. Skipping", consumerRecord);
            return false;
        }
        if (this.emitted.contains(kafkaSpoutMessageId)) {
            LOG.trace("Tuple for record [{}] has already been emitted. Skipping", consumerRecord);
            return false;
        }
        List<Object> apply = this.kafkaSpoutConfig.getTranslator().apply((ConsumerRecord) consumerRecord);
        if (!isEmitTuple(apply)) {
            LOG.debug("Not emitting null tuple for record [{}] as defined in configuration.", consumerRecord);
            kafkaSpoutMessageId.setEmitted(false);
            ack(kafkaSpoutMessageId);
            return false;
        }
        boolean isScheduled = this.retryService.isScheduled(kafkaSpoutMessageId);
        if (isScheduled && !this.retryService.isReady(kafkaSpoutMessageId)) {
            return false;
        }
        if (!this.consumerAutoCommitMode) {
            if (apply instanceof KafkaTuple) {
                this.collector.emit(((KafkaTuple) apply).getStream(), apply, kafkaSpoutMessageId);
            } else {
                this.collector.emit(apply, kafkaSpoutMessageId);
            }
            this.emitted.add(kafkaSpoutMessageId);
            if (isScheduled) {
                this.retryService.remove(kafkaSpoutMessageId);
            } else {
                this.numUncommittedOffsets++;
            }
        } else if (apply instanceof KafkaTuple) {
            this.collector.emit(((KafkaTuple) apply).getStream(), apply);
        } else {
            this.collector.emit(apply);
        }
        LOG.trace("Emitted tuple [{}] for record [{}] with msgId [{}]", new Object[]{apply, consumerRecord, kafkaSpoutMessageId});
        return true;
    }

    private boolean isEmitTuple(List<Object> list) {
        return list != null || this.kafkaSpoutConfig.isEmitNullTuples();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void commitOffsetsForAckedTuples() {
        HashMap hashMap = new HashMap();
        for (Map.Entry<TopicPartition, OffsetManager> entry : this.acked.entrySet()) {
            OffsetAndMetadata findNextCommitOffset = entry.getValue().findNextCommitOffset();
            if (findNextCommitOffset != null) {
                hashMap.put(entry.getKey(), findNextCommitOffset);
            }
        }
        if (hashMap.isEmpty()) {
            LOG.trace("No offsets to commit. {}", this);
            return;
        }
        this.kafkaConsumer.commitSync(hashMap);
        LOG.debug("Offsets successfully committed to Kafka [{}]", hashMap);
        for (Map.Entry<K, V> entry2 : hashMap.entrySet()) {
            this.numUncommittedOffsets -= this.acked.get((TopicPartition) entry2.getKey()).commit((OffsetAndMetadata) entry2.getValue());
            LOG.debug("[{}] uncommitted offsets across all topic partitions", Long.valueOf(this.numUncommittedOffsets));
        }
    }

    public void ack(Object obj) {
        KafkaSpoutMessageId kafkaSpoutMessageId = (KafkaSpoutMessageId) obj;
        if (this.emitted.contains(kafkaSpoutMessageId)) {
            if (!this.consumerAutoCommitMode) {
                this.acked.get(kafkaSpoutMessageId.getTopicPartition()).add(kafkaSpoutMessageId);
            }
            this.emitted.remove(kafkaSpoutMessageId);
        } else if (kafkaSpoutMessageId.isEmitted()) {
            LOG.debug("Received ack for message [{}], associated with tuple emitted for a ConsumerRecord that came from a topic-partition that this consumer group instance is no longer tracking due to rebalance/partition reassignment. No action taken.", kafkaSpoutMessageId);
        } else {
            LOG.debug("Received direct ack for message [{}], associated with null tuple", kafkaSpoutMessageId);
        }
    }

    public void fail(Object obj) {
        KafkaSpoutMessageId kafkaSpoutMessageId = (KafkaSpoutMessageId) obj;
        if (!this.emitted.contains(kafkaSpoutMessageId)) {
            LOG.debug("Received fail for tuple this spout is no longer tracking. Partitions may have been reassigned. Ignoring message [{}]", kafkaSpoutMessageId);
            return;
        }
        this.emitted.remove(kafkaSpoutMessageId);
        kafkaSpoutMessageId.incrementNumFails();
        if (this.retryService.schedule(kafkaSpoutMessageId)) {
            return;
        }
        LOG.debug("Reached maximum number of retries. Message [{}] being marked as acked.", kafkaSpoutMessageId);
        ack(kafkaSpoutMessageId);
    }

    public void activate() {
        try {
            subscribeKafkaConsumer();
        } catch (InterruptException e) {
            throwKafkaConsumerInterruptedException();
        }
    }

    private void subscribeKafkaConsumer() {
        this.kafkaConsumer = this.kafkaConsumerFactory.createConsumer(this.kafkaSpoutConfig);
        this.kafkaSpoutConfig.getSubscription().subscribe(this.kafkaConsumer, new KafkaSpoutConsumerRebalanceListener(), this.context);
    }

    public void deactivate() {
        try {
            shutdown();
        } catch (InterruptException e) {
            throwKafkaConsumerInterruptedException();
        }
    }

    public void close() {
        try {
            shutdown();
        } catch (InterruptException e) {
            throwKafkaConsumerInterruptedException();
        }
    }

    private void shutdown() {
        try {
            if (!this.consumerAutoCommitMode) {
                commitOffsetsForAckedTuples();
            }
        } finally {
            this.kafkaConsumer.close();
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        RecordTranslator<K, V> translator = this.kafkaSpoutConfig.getTranslator();
        for (String str : translator.streams()) {
            outputFieldsDeclarer.declareStream(str, translator.getFieldsFor(str));
        }
    }

    public String toString() {
        return "KafkaSpout{acked=" + this.acked + ", emitted=" + this.emitted + "}";
    }

    public Map<String, Object> getComponentConfiguration() {
        Map<String, Object> componentConfiguration = super.getComponentConfiguration();
        if (componentConfiguration == null) {
            componentConfiguration = new HashMap();
        }
        componentConfiguration.put("config.topics", getTopicsString());
        componentConfiguration.put("config.groupid", this.kafkaSpoutConfig.getConsumerGroupId());
        componentConfiguration.put("config.bootstrap.servers", this.kafkaSpoutConfig.getKafkaProps().get("bootstrap.servers"));
        componentConfiguration.put("config.security.protocol", this.kafkaSpoutConfig.getKafkaProps().get("security.protocol"));
        return componentConfiguration;
    }

    private String getTopicsString() {
        return this.kafkaSpoutConfig.getSubscription().getTopicsString();
    }
}
