/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.kafka.spout.metrics;

import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.storm.kafka.spout.internal.OffsetManager;
import org.apache.storm.metric.api.IMetric;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaOffsetMetric<K, V>
implements IMetric {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaOffsetMetric.class);
    private final Supplier<Map<TopicPartition, OffsetManager>> offsetManagerSupplier;
    private final Supplier<Consumer<K, V>> consumerSupplier;

    public KafkaOffsetMetric(Supplier<Map<TopicPartition, OffsetManager>> offsetManagerSupplier, Supplier<Consumer<K, V>> consumerSupplier) {
        this.offsetManagerSupplier = offsetManagerSupplier;
        this.consumerSupplier = consumerSupplier;
    }

    public Object getValueAndReset() {
        Map endOffsets;
        Map beginningOffsets;
        Map<TopicPartition, OffsetManager> offsetManagers = this.offsetManagerSupplier.get();
        Consumer<K, V> consumer = this.consumerSupplier.get();
        if (offsetManagers == null || offsetManagers.isEmpty() || consumer == null) {
            LOG.debug("Metrics Tick: offsetManagers or kafkaConsumer is null.");
            return null;
        }
        HashMap<String, TopicMetrics> topicMetricsMap = new HashMap<String, TopicMetrics>();
        Set<TopicPartition> topicPartitions = offsetManagers.keySet();
        try {
            beginningOffsets = consumer.beginningOffsets(topicPartitions);
            endOffsets = consumer.endOffsets(topicPartitions);
        }
        catch (RetriableException e) {
            LOG.warn("Failed to get offsets from Kafka! Will retry on next metrics tick.", (Throwable)e);
            return null;
        }
        HashMap<String, Long> result = new HashMap<String, Long>();
        for (Map.Entry<TopicPartition, OffsetManager> entry : offsetManagers.entrySet()) {
            TopicPartition topicPartition = entry.getKey();
            OffsetManager offsetManager = entry.getValue();
            long latestTimeOffset = (Long)endOffsets.get(topicPartition);
            long earliestTimeOffset = (Long)beginningOffsets.get(topicPartition);
            long latestEmittedOffset = offsetManager.getLatestEmittedOffset();
            long latestCompletedOffset = offsetManager.getCommittedOffset();
            long spoutLag = latestTimeOffset - latestCompletedOffset;
            long recordsInPartition = latestTimeOffset - earliestTimeOffset;
            String metricPath = topicPartition.topic() + "/partition_" + topicPartition.partition();
            result.put(metricPath + "/spoutLag", spoutLag);
            result.put(metricPath + "/earliestTimeOffset", earliestTimeOffset);
            result.put(metricPath + "/latestTimeOffset", latestTimeOffset);
            result.put(metricPath + "/latestEmittedOffset", latestEmittedOffset);
            result.put(metricPath + "/latestCompletedOffset", latestCompletedOffset);
            result.put(metricPath + "/recordsInPartition", recordsInPartition);
            TopicMetrics topicMetrics = (TopicMetrics)topicMetricsMap.get(topicPartition.topic());
            if (topicMetrics == null) {
                topicMetrics = new TopicMetrics();
                topicMetricsMap.put(topicPartition.topic(), topicMetrics);
            }
            topicMetrics.totalSpoutLag += spoutLag;
            topicMetrics.totalEarliestTimeOffset += earliestTimeOffset;
            topicMetrics.totalLatestTimeOffset += latestTimeOffset;
            topicMetrics.totalLatestEmittedOffset += latestEmittedOffset;
            topicMetrics.totalLatestCompletedOffset += latestCompletedOffset;
            topicMetrics.totalRecordsInPartitions += recordsInPartition;
        }
        for (Map.Entry<Object, OffsetManager> entry : topicMetricsMap.entrySet()) {
            String topic = (String)entry.getKey();
            TopicMetrics topicMetrics = (TopicMetrics)((Object)entry.getValue());
            result.put(topic + "/totalSpoutLag", topicMetrics.totalSpoutLag);
            result.put(topic + "/totalEarliestTimeOffset", topicMetrics.totalEarliestTimeOffset);
            result.put(topic + "/totalLatestTimeOffset", topicMetrics.totalLatestTimeOffset);
            result.put(topic + "/totalLatestEmittedOffset", topicMetrics.totalLatestEmittedOffset);
            result.put(topic + "/totalLatestCompletedOffset", topicMetrics.totalLatestCompletedOffset);
            result.put(topic + "/totalRecordsInPartitions", topicMetrics.totalRecordsInPartitions);
        }
        LOG.debug("Metrics Tick: value : {}", result);
        return result;
    }

    private class TopicMetrics {
        long totalSpoutLag = 0L;
        long totalEarliestTimeOffset = 0L;
        long totalLatestTimeOffset = 0L;
        long totalLatestEmittedOffset = 0L;
        long totalLatestCompletedOffset = 0L;
        long totalRecordsInPartitions = 0L;

        private TopicMetrics() {
        }
    }
}

