/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.kafka.spout;

import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;
import org.apache.storm.kafka.spout.KafkaSpoutMessageId;
import org.apache.storm.kafka.spout.KafkaSpoutRetryService;
import org.apache.storm.kafka.spout.KafkaSpoutStreams;
import org.apache.storm.kafka.spout.KafkaSpoutTuplesBuilder;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaSpout<K, V>
extends BaseRichSpout {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaSpout.class);
    private static final Comparator<KafkaSpoutMessageId> OFFSET_COMPARATOR = new OffsetComparator();
    protected SpoutOutputCollector collector;
    private final KafkaSpoutConfig<K, V> kafkaSpoutConfig;
    private transient KafkaConsumer<K, V> kafkaConsumer;
    private transient boolean consumerAutoCommitMode;
    private transient int maxRetries;
    private transient KafkaSpoutConfig.FirstPollOffsetStrategy firstPollOffsetStrategy;
    private transient KafkaSpoutRetryService retryService;
    private transient Timer commitTimer;
    private transient boolean initialized;
    private KafkaSpoutStreams kafkaSpoutStreams;
    private transient KafkaSpoutTuplesBuilder<K, V> tuplesBuilder;
    private transient Map<TopicPartition, OffsetEntry> acked;
    private transient Set<KafkaSpoutMessageId> emitted;
    private transient Iterator<ConsumerRecord<K, V>> waitingToEmit;
    private transient long numUncommittedOffsets;

    public KafkaSpout(KafkaSpoutConfig<K, V> kafkaSpoutConfig) {
        this.kafkaSpoutConfig = kafkaSpoutConfig;
        this.kafkaSpoutStreams = kafkaSpoutConfig.getKafkaSpoutStreams();
    }

    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        this.initialized = false;
        this.collector = collector;
        this.maxRetries = this.kafkaSpoutConfig.getMaxTupleRetries();
        this.numUncommittedOffsets = 0L;
        this.firstPollOffsetStrategy = this.kafkaSpoutConfig.getFirstPollOffsetStrategy();
        this.consumerAutoCommitMode = this.kafkaSpoutConfig.isConsumerAutoCommitMode();
        this.retryService = this.kafkaSpoutConfig.getRetryService();
        this.tuplesBuilder = this.kafkaSpoutConfig.getTuplesBuilder();
        if (!this.consumerAutoCommitMode) {
            this.commitTimer = new Timer(500L, this.kafkaSpoutConfig.getOffsetsCommitPeriodMs(), TimeUnit.MILLISECONDS);
        }
        this.acked = new HashMap<TopicPartition, OffsetEntry>();
        this.emitted = new HashSet<KafkaSpoutMessageId>();
        this.waitingToEmit = Collections.emptyListIterator();
        LOG.info("Kafka Spout opened with the following configuration: {}", this.kafkaSpoutConfig);
    }

    private void setAcked(TopicPartition tp, long fetchOffset) {
        if (!this.consumerAutoCommitMode && !this.acked.containsKey(tp)) {
            this.acked.put(tp, new OffsetEntry(tp, fetchOffset));
        }
    }

    public void nextTuple() {
        if (this.initialized) {
            if (this.commit()) {
                this.commitOffsetsForAckedTuples();
            }
            if (this.poll()) {
                this.setWaitingToEmit(this.pollKafkaBroker());
            }
            if (this.waitingToEmit()) {
                this.emit();
            }
        } else {
            LOG.debug("Spout not initialized. Not sending tuples until initialization completes");
        }
    }

    private boolean commit() {
        return !this.consumerAutoCommitMode && this.commitTimer.isExpiredResetOnTrue();
    }

    private boolean poll() {
        return !this.waitingToEmit() && this.numUncommittedOffsets < (long)this.kafkaSpoutConfig.getMaxUncommittedOffsets();
    }

    private boolean waitingToEmit() {
        return this.waitingToEmit != null && this.waitingToEmit.hasNext();
    }

    public void setWaitingToEmit(ConsumerRecords<K, V> consumerRecords) {
        LinkedList waitingToEmitList = new LinkedList();
        for (TopicPartition tp : consumerRecords.partitions()) {
            waitingToEmitList.addAll(consumerRecords.records(tp));
        }
        this.waitingToEmit = waitingToEmitList.iterator();
        LOG.trace("Records waiting to be emitted {}", waitingToEmitList);
    }

    private ConsumerRecords<K, V> pollKafkaBroker() {
        this.doSeekRetriableTopicPartitions();
        ConsumerRecords consumerRecords = this.kafkaConsumer.poll(this.kafkaSpoutConfig.getPollTimeoutMs());
        int numPolledRecords = consumerRecords.count();
        LOG.debug("Polled [{}] records from Kafka. NumUncommittedOffsets=[{}]", (Object)numPolledRecords, (Object)this.numUncommittedOffsets);
        return consumerRecords;
    }

    private void doSeekRetriableTopicPartitions() {
        Set<TopicPartition> retriableTopicPartitions = this.retryService.retriableTopicPartitions();
        for (TopicPartition rtp : retriableTopicPartitions) {
            OffsetAndMetadata offsetAndMeta = this.acked.get(rtp).findNextCommitOffset();
            if (offsetAndMeta != null) {
                this.kafkaConsumer.seek(rtp, offsetAndMeta.offset() + 1L);
                continue;
            }
            this.kafkaConsumer.seekToEnd(new TopicPartition[]{rtp});
        }
    }

    private void emit() {
        this.emitTupleIfNotEmitted(this.waitingToEmit.next());
        this.waitingToEmit.remove();
    }

    private void emitTupleIfNotEmitted(ConsumerRecord<K, V> record) {
        TopicPartition tp = new TopicPartition(record.topic(), record.partition());
        KafkaSpoutMessageId msgId = new KafkaSpoutMessageId(record);
        if (this.acked.containsKey(tp) && this.acked.get(tp).contains(msgId)) {
            LOG.trace("Tuple for record [{}] has already been acked. Skipping", record);
        } else if (this.emitted.contains(msgId)) {
            LOG.trace("Tuple for record [{}] has already been emitted. Skipping", record);
        } else if (!this.retryService.isScheduled(msgId) || this.retryService.isReady(msgId)) {
            List<Object> tuple = this.tuplesBuilder.buildTuple(record);
            this.kafkaSpoutStreams.emit(this.collector, tuple, msgId);
            this.emitted.add(msgId);
            ++this.numUncommittedOffsets;
            if (this.retryService.isReady(msgId)) {
                this.retryService.remove(msgId);
            }
            LOG.trace("Emitted tuple [{}] for record [{}]", tuple, record);
        }
    }

    private void commitOffsetsForAckedTuples() {
        HashMap<TopicPartition, OffsetAndMetadata> nextCommitOffsets = new HashMap<TopicPartition, OffsetAndMetadata>();
        for (Map.Entry<TopicPartition, OffsetEntry> tpOffset : this.acked.entrySet()) {
            OffsetAndMetadata nextCommitOffset = tpOffset.getValue().findNextCommitOffset();
            if (nextCommitOffset == null) continue;
            nextCommitOffsets.put(tpOffset.getKey(), nextCommitOffset);
        }
        if (!nextCommitOffsets.isEmpty()) {
            this.kafkaConsumer.commitSync(nextCommitOffsets);
            LOG.debug("Offsets successfully committed to Kafka [{}]", nextCommitOffsets);
            for (Map.Entry<TopicPartition, OffsetEntry> tpOffset : this.acked.entrySet()) {
                OffsetEntry offsetEntry = tpOffset.getValue();
                offsetEntry.commit((OffsetAndMetadata)nextCommitOffsets.get(tpOffset.getKey()));
            }
        } else {
            LOG.trace("No offsets to commit. {}", (Object)this);
        }
    }

    public void ack(Object messageId) {
        KafkaSpoutMessageId msgId = (KafkaSpoutMessageId)messageId;
        if (!this.consumerAutoCommitMode) {
            this.acked.get(msgId.getTopicPartition()).add(msgId);
            LOG.trace("Acked message [{}]. Messages acked and pending commit [{}]", (Object)msgId, this.acked);
        }
        this.emitted.remove(msgId);
    }

    public void fail(Object messageId) {
        KafkaSpoutMessageId msgId = (KafkaSpoutMessageId)messageId;
        this.emitted.remove(msgId);
        if (msgId.numFails() < this.maxRetries) {
            msgId.incrementNumFails();
            this.retryService.schedule(msgId);
        } else {
            LOG.debug("Reached maximum number of retries. Message [{}] being marked as acked.", (Object)msgId);
            this.ack(msgId);
        }
    }

    public void activate() {
        this.subscribeKafkaConsumer();
    }

    private void subscribeKafkaConsumer() {
        this.kafkaConsumer = new KafkaConsumer(this.kafkaSpoutConfig.getKafkaProps(), this.kafkaSpoutConfig.getKeyDeserializer(), this.kafkaSpoutConfig.getValueDeserializer());
        this.kafkaConsumer.subscribe(this.kafkaSpoutConfig.getSubscribedTopics(), (ConsumerRebalanceListener)new KafkaSpoutConsumerRebalanceListener());
        this.kafkaConsumer.poll(0L);
    }

    public void deactivate() {
        this.shutdown();
    }

    public void close() {
        this.shutdown();
    }

    private void shutdown() {
        try {
            this.kafkaConsumer.wakeup();
            if (!this.consumerAutoCommitMode) {
                this.commitOffsetsForAckedTuples();
            }
        }
        finally {
            this.kafkaConsumer.close();
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        this.kafkaSpoutStreams.declareOutputFields(declarer);
    }

    public String toString() {
        return "{acked=" + this.acked + "} ";
    }

    static /* synthetic */ Comparator access$1200() {
        return OFFSET_COMPARATOR;
    }

    private class Timer {
        private final long delay;
        private final long period;
        private final TimeUnit timeUnit;
        private final long periodNanos;
        private long start;

        public Timer(long delay, long period, TimeUnit timeUnit) {
            this.delay = delay;
            this.period = period;
            this.timeUnit = timeUnit;
            this.periodNanos = timeUnit.toNanos(period);
            this.start = System.nanoTime() + timeUnit.toNanos(delay);
        }

        public long period() {
            return this.period;
        }

        public long delay() {
            return this.delay;
        }

        public TimeUnit getTimeUnit() {
            return this.timeUnit;
        }

        public boolean isExpiredResetOnTrue() {
            boolean expired;
            boolean bl = expired = System.nanoTime() - this.start > this.periodNanos;
            if (expired) {
                this.start = System.nanoTime();
            }
            return expired;
        }
    }

    private class OffsetEntry {
        private final TopicPartition tp;
        private final long initialFetchOffset;
        private long committedOffset;
        private final NavigableSet<KafkaSpoutMessageId> ackedMsgs = new TreeSet<KafkaSpoutMessageId>(KafkaSpout.access$1200());

        public OffsetEntry(TopicPartition tp, long initialFetchOffset) {
            this.tp = tp;
            this.initialFetchOffset = initialFetchOffset;
            this.committedOffset = initialFetchOffset - 1L;
            LOG.debug("Instantiated {}", (Object)this);
        }

        public void add(KafkaSpoutMessageId msgId) {
            this.ackedMsgs.add(msgId);
        }

        public OffsetAndMetadata findNextCommitOffset() {
            boolean found = false;
            long nextCommitOffset = this.committedOffset;
            KafkaSpoutMessageId nextCommitMsg = null;
            for (KafkaSpoutMessageId currAckedMsg : this.ackedMsgs) {
                long currOffset = currAckedMsg.offset();
                if (currOffset == this.initialFetchOffset || currOffset == nextCommitOffset + 1L) {
                    found = true;
                    nextCommitMsg = currAckedMsg;
                    nextCommitOffset = currOffset;
                    LOG.trace("Found offset to commit [{}]. {}", (Object)currOffset, (Object)this);
                    continue;
                }
                if (currAckedMsg.offset() > nextCommitOffset + 1L) {
                    LOG.debug("Non continuous offset found [{}]. It will be processed in a subsequent batch. {}", (Object)currOffset, (Object)this);
                    break;
                }
                LOG.debug("Unexpected offset found [{}]. {}", (Object)currOffset, (Object)this);
                break;
            }
            OffsetAndMetadata nextCommitOffsetAndMetadata = null;
            if (found) {
                nextCommitOffsetAndMetadata = new OffsetAndMetadata(nextCommitOffset, nextCommitMsg.getMetadata(Thread.currentThread()));
                LOG.debug("Offset to be committed next: [{}] {}", (Object)nextCommitOffsetAndMetadata.offset(), (Object)this);
            } else {
                LOG.debug("No offsets ready to commit. {}", (Object)this);
            }
            return nextCommitOffsetAndMetadata;
        }

        public void commit(OffsetAndMetadata committedOffset) {
            if (committedOffset != null) {
                long numCommittedOffsets = committedOffset.offset() - this.committedOffset;
                this.committedOffset = committedOffset.offset();
                Iterator<KafkaSpoutMessageId> iterator = this.ackedMsgs.iterator();
                while (iterator.hasNext() && iterator.next().offset() <= committedOffset.offset()) {
                    iterator.remove();
                }
                KafkaSpout.this.numUncommittedOffsets = KafkaSpout.this.numUncommittedOffsets - numCommittedOffsets;
            }
            LOG.trace("Object state after update: {}, numUncommittedOffsets [{}]", (Object)this, (Object)KafkaSpout.this.numUncommittedOffsets);
        }

        public boolean isEmpty() {
            return this.ackedMsgs.isEmpty();
        }

        public boolean contains(ConsumerRecord record) {
            return this.contains(new KafkaSpoutMessageId(record));
        }

        public boolean contains(KafkaSpoutMessageId msgId) {
            return this.ackedMsgs.contains(msgId);
        }

        public String toString() {
            return "OffsetEntry{topic-partition=" + this.tp + ", fetchOffset=" + this.initialFetchOffset + ", committedOffset=" + this.committedOffset + ", ackedMsgs=" + this.ackedMsgs + '}';
        }
    }

    private static class OffsetComparator
    implements Comparator<KafkaSpoutMessageId> {
        private OffsetComparator() {
        }

        @Override
        public int compare(KafkaSpoutMessageId m1, KafkaSpoutMessageId m2) {
            return m1.offset() < m2.offset() ? -1 : (m1.offset() == m2.offset() ? 0 : 1);
        }
    }

    private class KafkaSpoutConsumerRebalanceListener
    implements ConsumerRebalanceListener {
        private KafkaSpoutConsumerRebalanceListener() {
        }

        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
            LOG.debug("Partitions revoked. [consumer-group={}, consumer={}, topic-partitions={}]", new Object[]{KafkaSpout.this.kafkaSpoutConfig.getConsumerGroupId(), KafkaSpout.this.kafkaConsumer, partitions});
            if (!KafkaSpout.this.consumerAutoCommitMode && KafkaSpout.this.initialized) {
                KafkaSpout.this.initialized = false;
                KafkaSpout.this.commitOffsetsForAckedTuples();
            }
        }

        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
            LOG.debug("Partitions reassignment. [consumer-group={}, consumer={}, topic-partitions={}]", new Object[]{KafkaSpout.this.kafkaSpoutConfig.getConsumerGroupId(), KafkaSpout.this.kafkaConsumer, partitions});
            this.initialize(partitions);
        }

        private void initialize(Collection<TopicPartition> partitions) {
            if (!KafkaSpout.this.consumerAutoCommitMode) {
                KafkaSpout.this.acked.keySet().retainAll(partitions);
            }
            KafkaSpout.this.retryService.retainAll(partitions);
            for (TopicPartition tp : partitions) {
                OffsetAndMetadata committedOffset = KafkaSpout.this.kafkaConsumer.committed(tp);
                long fetchOffset = this.doSeek(tp, committedOffset);
                KafkaSpout.this.setAcked(tp, fetchOffset);
            }
            KafkaSpout.this.initialized = true;
            LOG.debug("Initialization complete");
        }

        private long doSeek(TopicPartition tp, OffsetAndMetadata committedOffset) {
            long fetchOffset;
            if (committedOffset != null) {
                if (KafkaSpout.this.firstPollOffsetStrategy.equals((Object)KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST)) {
                    KafkaSpout.this.kafkaConsumer.seekToBeginning(new TopicPartition[]{tp});
                    fetchOffset = KafkaSpout.this.kafkaConsumer.position(tp);
                } else if (KafkaSpout.this.firstPollOffsetStrategy.equals((Object)KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST)) {
                    KafkaSpout.this.kafkaConsumer.seekToEnd(new TopicPartition[]{tp});
                    fetchOffset = KafkaSpout.this.kafkaConsumer.position(tp);
                } else {
                    fetchOffset = committedOffset.offset() + 1L;
                    KafkaSpout.this.kafkaConsumer.seek(tp, fetchOffset);
                }
            } else {
                if (KafkaSpout.this.firstPollOffsetStrategy.equals((Object)KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST) || KafkaSpout.this.firstPollOffsetStrategy.equals((Object)KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST)) {
                    KafkaSpout.this.kafkaConsumer.seekToBeginning(new TopicPartition[]{tp});
                } else if (KafkaSpout.this.firstPollOffsetStrategy.equals((Object)KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST) || KafkaSpout.this.firstPollOffsetStrategy.equals((Object)KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_LATEST)) {
                    KafkaSpout.this.kafkaConsumer.seekToEnd(new TopicPartition[]{tp});
                }
                fetchOffset = KafkaSpout.this.kafkaConsumer.position(tp);
            }
            return fetchOffset;
        }
    }
}

