/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.consumer.internals;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import org.apache.kafka.clients.consumer.AcknowledgeType;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.internals.Acknowledgements;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicIdPartition;

public class ShareInFlightBatch<K, V> {
    final TopicIdPartition partition;
    private final Map<Long, ConsumerRecord<K, V>> inFlightRecords;
    private final Set<Long> acknowledgedRecords;
    private Acknowledgements acknowledgements;
    private KafkaException exception;
    private boolean hasCachedException = false;

    public ShareInFlightBatch(TopicIdPartition partition) {
        this.partition = partition;
        this.inFlightRecords = new TreeMap<Long, ConsumerRecord<K, V>>();
        this.acknowledgedRecords = new TreeSet<Long>();
        this.acknowledgements = Acknowledgements.empty();
    }

    public void addAcknowledgement(long offset, AcknowledgeType acknowledgeType) {
        this.acknowledgements.add(offset, acknowledgeType);
    }

    public void acknowledge(ConsumerRecord<K, V> record, AcknowledgeType type) {
        if (this.inFlightRecords.get(record.offset()) != null) {
            this.acknowledgements.add(record.offset(), type);
            this.acknowledgedRecords.add(record.offset());
            return;
        }
        throw new IllegalStateException("The record cannot be acknowledged.");
    }

    public int acknowledgeAll(AcknowledgeType type) {
        int recordsAcknowledged = 0;
        for (Map.Entry<Long, ConsumerRecord<K, V>> entry : this.inFlightRecords.entrySet()) {
            if (!this.acknowledgements.addIfAbsent(entry.getKey(), type)) continue;
            this.acknowledgedRecords.add(entry.getKey());
            ++recordsAcknowledged;
        }
        return recordsAcknowledged;
    }

    public void addRecord(ConsumerRecord<K, V> record) {
        this.inFlightRecords.put(record.offset(), record);
    }

    public void addGap(long offset) {
        this.acknowledgements.addGap(offset);
    }

    public void merge(ShareInFlightBatch<K, V> other) {
        this.inFlightRecords.putAll(other.inFlightRecords);
    }

    List<ConsumerRecord<K, V>> getInFlightRecords() {
        return new ArrayList<ConsumerRecord<K, V>>(this.inFlightRecords.values());
    }

    int numRecords() {
        return this.inFlightRecords.size();
    }

    Acknowledgements takeAcknowledgedRecords() {
        if (this.acknowledgedRecords.size() == this.inFlightRecords.size()) {
            this.inFlightRecords.clear();
        } else {
            this.acknowledgedRecords.forEach(this.inFlightRecords::remove);
        }
        this.acknowledgedRecords.clear();
        Acknowledgements currentAcknowledgements = this.acknowledgements;
        this.acknowledgements = Acknowledgements.empty();
        return currentAcknowledgements;
    }

    Acknowledgements getAcknowledgements() {
        return this.acknowledgements;
    }

    public boolean isEmpty() {
        return this.inFlightRecords.isEmpty() && this.acknowledgements.isEmpty();
    }

    public void setException(KafkaException exception) {
        this.exception = exception;
    }

    public KafkaException getException() {
        return this.exception;
    }

    public void setHasCachedException(boolean hasCachedException) {
        this.hasCachedException = hasCachedException;
    }

    public boolean hasCachedException() {
        return this.hasCachedException;
    }
}

