/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.kafka.spout.metrics;

import com.google.common.base.Supplier;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.storm.kafka.spout.internal.OffsetManager;
import org.apache.storm.metric.api.IMetric;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaOffsetMetric<K, V>
implements IMetric {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaOffsetMetric.class);
    private final Supplier<Map<TopicPartition, OffsetManager>> offsetManagerSupplier;
    private final Supplier<KafkaConsumer<K, V>> consumerSupplier;

    public KafkaOffsetMetric(Supplier offsetManagerSupplier, Supplier consumerSupplier) {
        this.offsetManagerSupplier = offsetManagerSupplier;
        this.consumerSupplier = consumerSupplier;
    }

    public Object getValueAndReset() {
        Map offsetManagers = (Map)this.offsetManagerSupplier.get();
        KafkaConsumer kafkaConsumer = (KafkaConsumer)this.consumerSupplier.get();
        if (offsetManagers == null || offsetManagers.isEmpty() || kafkaConsumer == null) {
            LOG.debug("Metrics Tick: offsetManagers or kafkaConsumer is null.");
            return null;
        }
        HashMap<String, TopicMetrics> topicMetricsMap = new HashMap<String, TopicMetrics>();
        Set topicPartitions = offsetManagers.keySet();
        Map beginningOffsets = kafkaConsumer.beginningOffsets(topicPartitions);
        Map endOffsets = kafkaConsumer.endOffsets(topicPartitions);
        HashMap<String, Long> result = new HashMap<String, Long>();
        for (Map.Entry entry : offsetManagers.entrySet()) {
            TopicPartition topicPartition = (TopicPartition)entry.getKey();
            OffsetManager offsetManager = (OffsetManager)entry.getValue();
            long latestTimeOffset = (Long)endOffsets.get(topicPartition);
            long earliestTimeOffset = (Long)beginningOffsets.get(topicPartition);
            long latestEmittedOffset = offsetManager.getLatestEmittedOffset();
            long latestCompletedOffset = offsetManager.getCommittedOffset();
            long spoutLag = latestTimeOffset - latestCompletedOffset;
            long recordsInPartition = latestTimeOffset - earliestTimeOffset;
            String metricPath = topicPartition.topic() + "/partition_" + topicPartition.partition();
            result.put(metricPath + "/spoutLag", spoutLag);
            result.put(metricPath + "/earliestTimeOffset", earliestTimeOffset);
            result.put(metricPath + "/latestTimeOffset", latestTimeOffset);
            result.put(metricPath + "/latestEmittedOffset", latestEmittedOffset);
            result.put(metricPath + "/latestCompletedOffset", latestCompletedOffset);
            result.put(metricPath + "/recordsInPartition", recordsInPartition);
            TopicMetrics topicMetrics = (TopicMetrics)topicMetricsMap.get(topicPartition.topic());
            if (topicMetrics == null) {
                topicMetrics = new TopicMetrics();
                topicMetricsMap.put(topicPartition.topic(), topicMetrics);
            }
            topicMetrics.totalSpoutLag += spoutLag;
            topicMetrics.totalEarliestTimeOffset += earliestTimeOffset;
            topicMetrics.totalLatestTimeOffset += latestTimeOffset;
            topicMetrics.totalLatestEmittedOffset += latestEmittedOffset;
            topicMetrics.totalLatestCompletedOffset += latestCompletedOffset;
            topicMetrics.totalRecordsInPartitions += recordsInPartition;
        }
        for (Map.Entry e : topicMetricsMap.entrySet()) {
            String topic = (String)e.getKey();
            TopicMetrics topicMetrics = (TopicMetrics)e.getValue();
            result.put(topic + "/totalSpoutLag", topicMetrics.totalSpoutLag);
            result.put(topic + "/totalEarliestTimeOffset", topicMetrics.totalEarliestTimeOffset);
            result.put(topic + "/totalLatestTimeOffset", topicMetrics.totalLatestTimeOffset);
            result.put(topic + "/totalLatestEmittedOffset", topicMetrics.totalLatestEmittedOffset);
            result.put(topic + "/totalLatestCompletedOffset", topicMetrics.totalLatestCompletedOffset);
            result.put(topic + "/totalRecordsInPartitions", topicMetrics.totalRecordsInPartitions);
        }
        LOG.debug("Metrics Tick: value : {}", result);
        return result;
    }

    private class TopicMetrics {
        long totalSpoutLag = 0L;
        long totalEarliestTimeOffset = 0L;
        long totalLatestTimeOffset = 0L;
        long totalLatestEmittedOffset = 0L;
        long totalLatestCompletedOffset = 0L;
        long totalRecordsInPartitions = 0L;

        private TopicMetrics() {
        }
    }
}

