package org.apache.eagle.metric.kafka;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import org.apache.eagle.metric.reportor.EagleGaugeMetric;
import org.apache.eagle.metric.reportor.EagleMetric;
import org.apache.eagle.metric.reportor.EagleMetricListener;
import org.apache.eagle.metric.reportor.EagleServiceReporterMetricListener;
import org.apache.eagle.metric.reportor.MetricKeyCodeDecoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/eagle/metric/kafka/KafkaOffsetSpout.class */
public class KafkaOffsetSpout extends BaseRichSpout {
    private static final long serialVersionUID = 1;
    private static final long DEFAULT_ROUND_INTERVALS = 60000;
    private KafkaOffsetCheckerConfig config;
    private KafkaConsumerOffsetFetcher consumerOffsetFetcher;
    private KafkaLatestOffsetFetcher latestOffsetFetcher;
    private Map<String, String> baseMetricDimension;
    private long lastRoundTime = 0;
    private EagleMetricListener listener;
    private static final Logger LOG = LoggerFactory.getLogger(KafkaOffsetSpout.class);

    public KafkaOffsetSpout(KafkaOffsetCheckerConfig kafkaOffsetCheckerConfig) {
        this.config = kafkaOffsetCheckerConfig;
    }

    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        this.consumerOffsetFetcher = new KafkaConsumerOffsetFetcher(this.config.zkConfig, this.config.kafkaConfig.topic, this.config.kafkaConfig.group);
        this.latestOffsetFetcher = new KafkaLatestOffsetFetcher(this.config.kafkaConfig.kafkaEndPoints);
        this.baseMetricDimension = new HashMap();
        this.baseMetricDimension.put("site", this.config.kafkaConfig.site);
        this.baseMetricDimension.put("topic", this.config.kafkaConfig.topic);
        this.baseMetricDimension.put("group", this.config.kafkaConfig.group);
        String str = this.config.serviceConfig.serviceHost;
        Integer num = this.config.serviceConfig.servicePort;
        this.listener = new EagleServiceReporterMetricListener(str, num.intValue(), this.config.serviceConfig.username, this.config.serviceConfig.password);
    }

    private EagleMetric constructMetric(long j, String str, double d) {
        HashMap hashMap = new HashMap();
        hashMap.putAll(this.baseMetricDimension);
        hashMap.put("partition", str);
        return new EagleGaugeMetric(j, MetricKeyCodeDecoder.codeTSMetricKey(j, "eagle.kafka.message.consumer.lag", hashMap), d);
    }

    private long trimTimestamp(long j, long j2) {
        return (j / j2) * j2;
    }

    public void nextTuple() {
        Long valueOf = Long.valueOf(System.currentTimeMillis());
        if (valueOf.longValue() - this.lastRoundTime > DEFAULT_ROUND_INTERVALS) {
            try {
                long trimTimestamp = trimTimestamp(valueOf.longValue(), DEFAULT_ROUND_INTERVALS);
                Map<String, Long> fetch = this.consumerOffsetFetcher.fetch();
                Map<Integer, Long> fetch2 = this.latestOffsetFetcher.fetch(this.config.kafkaConfig.topic, fetch.size());
                ArrayList arrayList = new ArrayList();
                for (Map.Entry<String, Long> entry : fetch.entrySet()) {
                    String key = entry.getKey();
                    Integer valueOf2 = Integer.valueOf(key.split("_")[1]);
                    Long valueOf3 = Long.valueOf(fetch2.get(valueOf2).longValue() - entry.getValue().longValue());
                    if (fetch2.get(valueOf2).longValue() == -1) {
                        valueOf3 = -1L;
                    }
                    arrayList.add(constructMetric(trimTimestamp, key, valueOf3.longValue()));
                }
                this.lastRoundTime = trimTimestamp;
                this.listener.onMetricFlushed(arrayList);
            } catch (Exception e) {
                LOG.error("Got an exception, ex: ", e);
            }
        }
        try {
            Thread.sleep(10000L);
        } catch (Throwable th) {
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
    }

    public void ack(Object obj) {
    }

    public void fail(Object obj) {
    }

    public void deactivate() {
    }

    public void close() {
    }
}
