package org.apache.storm.kafka.spout.internal;

import java.util.Comparator;
import java.util.Iterator;
import java.util.NavigableSet;
import java.util.TreeSet;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.storm.kafka.spout.KafkaSpoutMessageId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/storm/kafka/spout/internal/OffsetManager.class */
public class OffsetManager {
    private static final Comparator<KafkaSpoutMessageId> OFFSET_COMPARATOR = new OffsetComparator();
    private static final Logger LOG = LoggerFactory.getLogger(OffsetManager.class);
    private final TopicPartition tp;
    private final long initialFetchOffset;
    private long committedOffset;
    private final NavigableSet<KafkaSpoutMessageId> ackedMsgs = new TreeSet(OFFSET_COMPARATOR);

    /* loaded from: input_file:org/apache/storm/kafka/spout/internal/OffsetManager$OffsetComparator.class */
    private static class OffsetComparator implements Comparator<KafkaSpoutMessageId> {
        private OffsetComparator() {
        }

        @Override // java.util.Comparator
        public int compare(KafkaSpoutMessageId kafkaSpoutMessageId, KafkaSpoutMessageId kafkaSpoutMessageId2) {
            if (kafkaSpoutMessageId.offset() < kafkaSpoutMessageId2.offset()) {
                return -1;
            }
            return kafkaSpoutMessageId.offset() == kafkaSpoutMessageId2.offset() ? 0 : 1;
        }
    }

    public OffsetManager(TopicPartition topicPartition, long j) {
        this.tp = topicPartition;
        this.initialFetchOffset = j;
        this.committedOffset = j - 1;
        LOG.debug("Instantiated {}", this);
    }

    public void add(KafkaSpoutMessageId kafkaSpoutMessageId) {
        this.ackedMsgs.add(kafkaSpoutMessageId);
    }

    public OffsetAndMetadata findNextCommitOffset() {
        boolean z = false;
        long j = this.committedOffset;
        KafkaSpoutMessageId kafkaSpoutMessageId = null;
        Iterator<KafkaSpoutMessageId> it = this.ackedMsgs.iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            KafkaSpoutMessageId next = it.next();
            long offset = next.offset();
            if (offset == j + 1) {
                z = true;
                kafkaSpoutMessageId = next;
                j = offset;
            } else {
                if (next.offset() > j + 1) {
                    LOG.debug("topic-partition [{}] has non-continuous offset [{}]. It will be processed in a subsequent batch.", this.tp, Long.valueOf(offset));
                    break;
                }
                LOG.warn("topic-partition [{}] has unexpected offset [{}]. Current committed Offset [{}]", new Object[]{this.tp, Long.valueOf(offset), Long.valueOf(this.committedOffset)});
            }
        }
        OffsetAndMetadata offsetAndMetadata = null;
        if (z) {
            offsetAndMetadata = new OffsetAndMetadata(j, kafkaSpoutMessageId.getMetadata(Thread.currentThread()));
            LOG.debug("topic-partition [{}] has offsets [{}-{}] ready to be committed", new Object[]{this.tp, Long.valueOf(this.committedOffset + 1), Long.valueOf(offsetAndMetadata.offset())});
        } else {
            LOG.debug("topic-partition [{}] has NO offsets ready to be committed", this.tp);
        }
        LOG.trace("{}", this);
        return offsetAndMetadata;
    }

    public long commit(OffsetAndMetadata offsetAndMetadata) {
        long j = this.committedOffset;
        long offset = offsetAndMetadata.offset() - this.committedOffset;
        this.committedOffset = offsetAndMetadata.offset();
        Iterator<KafkaSpoutMessageId> it = this.ackedMsgs.iterator();
        while (it.hasNext() && it.next().offset() <= offsetAndMetadata.offset()) {
            it.remove();
        }
        LOG.trace("{}", this);
        LOG.debug("Committed offsets [{}-{} = {}] for topic-partition [{}].", new Object[]{Long.valueOf(j + 1), Long.valueOf(this.committedOffset), Long.valueOf(offset), this.tp});
        return offset;
    }

    public long getCommittedOffset() {
        return this.committedOffset;
    }

    public boolean isEmpty() {
        return this.ackedMsgs.isEmpty();
    }

    public boolean contains(ConsumerRecord consumerRecord) {
        return contains(new KafkaSpoutMessageId(consumerRecord));
    }

    public boolean contains(KafkaSpoutMessageId kafkaSpoutMessageId) {
        return this.ackedMsgs.contains(kafkaSpoutMessageId);
    }

    public String toString() {
        return "OffsetManager{topic-partition=" + this.tp + ", fetchOffset=" + this.initialFetchOffset + ", committedOffset=" + this.committedOffset + ", ackedMsgs=" + this.ackedMsgs + '}';
    }
}
