package org.apache.eagle.metric.kafka;

import com.codahale.metrics.MetricRegistry;
import com.typesafe.config.Config;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.eagle.datastream.Collector;
import org.apache.eagle.datastream.JavaStormStreamExecutor1;
import org.apache.eagle.metric.reportor.EagleCounterMetric;
import org.apache.eagle.metric.reportor.EagleMetric;
import org.apache.eagle.metric.reportor.EagleMetricListener;
import org.apache.eagle.metric.reportor.EagleServiceReporterMetricListener;
import org.apache.eagle.metric.reportor.MetricKeyCodeDecoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple1;

/* loaded from: input_file:org/apache/eagle/metric/kafka/KafkaMessageDistributionExecutor.class */
public class KafkaMessageDistributionExecutor extends JavaStormStreamExecutor1<String> {
    private Config config;
    private Map<String, String> baseMetricDimension;
    private MetricRegistry registry;
    private EagleMetricListener listener;
    private long granularity;
    private static final long DEFAULT_METRIC_GRANULARITY = 60000;
    private static final Logger LOG = LoggerFactory.getLogger(KafkaMessageDistributionExecutor.class);

    public void prepareConfig(Config config) {
        this.config = config;
    }

    public void init() {
        String string = this.config.getString("dataSourceConfig.site");
        String string2 = this.config.getString("dataSourceConfig.topic");
        this.baseMetricDimension = new HashMap();
        this.baseMetricDimension.put("site", string);
        this.baseMetricDimension.put("topic", string2);
        this.registry = new MetricRegistry();
        this.granularity = DEFAULT_METRIC_GRANULARITY;
        if (this.config.hasPath("dataSourceConfig.kafkaDistributionDataIntervalMin")) {
            this.granularity = this.config.getInt("dataSourceConfig.kafkaDistributionDataIntervalMin") * DEFAULT_METRIC_GRANULARITY;
        }
        this.listener = new EagleServiceReporterMetricListener(this.config.getString("eagleProps.eagleService.host"), this.config.getInt("eagleProps.eagleService.port"), this.config.getString("eagleProps.eagleService.username"), this.config.getString("eagleProps.eagleService.password"));
    }

    public String generateMetricKey(String str) {
        HashMap hashMap = new HashMap();
        hashMap.putAll(this.baseMetricDimension);
        hashMap.put("user", str);
        return MetricKeyCodeDecoder.codeMetricKey("eagle.kafka.message.count", hashMap);
    }

    public void flatMap(List<Object> list, Collector<Tuple1<String>> collector) {
        try {
            String str = (String) list.get(0);
            Long l = (Long) list.get(1);
            String generateMetricKey = generateMetricKey(str);
            if (this.registry.getMetrics().get(generateMetricKey) == null) {
                EagleCounterMetric eagleCounterMetric = new EagleCounterMetric(l.longValue(), generateMetricKey, 1.0d, this.granularity);
                eagleCounterMetric.registerListener(this.listener);
                this.registry.register(generateMetricKey, eagleCounterMetric);
            } else {
                ((EagleMetric) this.registry.getMetrics().get(generateMetricKey)).update(1.0d, l.longValue());
            }
        } catch (Exception e) {
            LOG.error("Got an exception, ex: ", e);
        }
    }
}
