/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.util.Collections;
import java.util.Comparator;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Set;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.processor.internals.RecordQueue;
import org.apache.kafka.streams.processor.internals.StampedRecord;

public class PartitionGroup {
    private final Map<TopicPartition, RecordQueue> partitionQueues;
    private final PriorityQueue<RecordQueue> queuesByTime;
    private final TimestampExtractor timestampExtractor;
    private int totalBuffered;

    public PartitionGroup(Map<TopicPartition, RecordQueue> partitionQueues, TimestampExtractor timestampExtractor) {
        this.queuesByTime = new PriorityQueue<RecordQueue>(partitionQueues.size(), new Comparator<RecordQueue>(){

            @Override
            public int compare(RecordQueue queue1, RecordQueue queue2) {
                long time2;
                long time1 = queue1.timestamp();
                if (time1 < (time2 = queue2.timestamp())) {
                    return -1;
                }
                if (time1 > time2) {
                    return 1;
                }
                return 0;
            }
        });
        this.partitionQueues = partitionQueues;
        this.timestampExtractor = timestampExtractor;
        this.totalBuffered = 0;
    }

    public StampedRecord nextRecord(RecordInfo info) {
        StampedRecord record = null;
        RecordQueue queue = this.queuesByTime.poll();
        if (queue != null) {
            record = queue.poll();
            if (!queue.isEmpty()) {
                this.queuesByTime.offer(queue);
            }
        }
        info.queue = queue;
        if (record != null) {
            --this.totalBuffered;
        }
        return record;
    }

    public int addRawRecords(TopicPartition partition, Iterable<ConsumerRecord<byte[], byte[]>> rawRecords) {
        RecordQueue recordQueue = this.partitionQueues.get(partition);
        int oldSize = recordQueue.size();
        int newSize = recordQueue.addRawRecords(rawRecords);
        if (oldSize == 0 && newSize > 0) {
            this.queuesByTime.offer(recordQueue);
        }
        this.totalBuffered += newSize - oldSize;
        return newSize;
    }

    public Set<TopicPartition> partitions() {
        return Collections.unmodifiableSet(this.partitionQueues.keySet());
    }

    public long timestamp() {
        long timestamp = Long.MAX_VALUE;
        for (RecordQueue queue : this.partitionQueues.values()) {
            if (timestamp <= queue.timestamp()) continue;
            timestamp = queue.timestamp();
        }
        return timestamp;
    }

    public int numBuffered(TopicPartition partition) {
        RecordQueue recordQueue = this.partitionQueues.get(partition);
        if (recordQueue == null) {
            throw new IllegalStateException("Record's partition does not belong to this partition-group.");
        }
        return recordQueue.size();
    }

    public int topQueueSize() {
        RecordQueue recordQueue = this.queuesByTime.peek();
        return recordQueue == null ? 0 : recordQueue.size();
    }

    public int numBuffered() {
        return this.totalBuffered;
    }

    public void close() {
        this.queuesByTime.clear();
        this.partitionQueues.clear();
    }

    public void clear() {
        this.queuesByTime.clear();
        for (RecordQueue queue : this.partitionQueues.values()) {
            queue.clear();
        }
    }

    public static class RecordInfo {
        public RecordQueue queue;

        public ProcessorNode node() {
            return this.queue.source();
        }

        public TopicPartition partition() {
            return this.queue.partition();
        }

        public RecordQueue queue() {
            return this.queue;
        }
    }
}

