package org.apache.eagle.metric.kafka;

import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.topology.base.BaseRichSpout;
import com.typesafe.config.Config;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutProvider;
import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutScheme;
import org.apache.eagle.datastream.ExecutionEnvironments;
import org.apache.eagle.datastream.storm.StormExecutionEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.ZkHosts;

/* loaded from: input_file:org/apache/eagle/metric/kafka/EagleMetricCollectorMain.class */
public class EagleMetricCollectorMain {
    private static final Logger LOG = LoggerFactory.getLogger(EagleMetricCollectorMain.class);

    public static void main(String[] strArr) throws Exception {
        StormExecutionEnvironment storm = ExecutionEnvironments.getStorm(strArr);
        Config config = storm.getConfig();
        final KafkaSourcedSpoutScheme kafkaSourcedSpoutScheme = new KafkaSourcedSpoutScheme(config.getString("dataSourceConfig.deserializerClass"), config) { // from class: org.apache.eagle.metric.kafka.EagleMetricCollectorMain.1
            public List<Object> deserialize(byte[] bArr) {
                Object deserialize = this.deserializer.deserialize(bArr);
                Map map = (Map) deserialize;
                if (deserialize == null) {
                    return null;
                }
                return Arrays.asList(map.get("user"), map.get("timestamp"));
            }
        };
        KafkaSourcedSpoutProvider kafkaSourcedSpoutProvider = new KafkaSourcedSpoutProvider() { // from class: org.apache.eagle.metric.kafka.EagleMetricCollectorMain.2
            public BaseRichSpout getSpout(Config config2) {
                String string = config2.getString("dataSourceConfig.topic");
                String string2 = config2.getString("dataSourceConfig.metricCollectionConsumerId");
                int i = config2.getInt("dataSourceConfig.fetchSize");
                config2.getString("dataSourceConfig.deserializerClass");
                String string3 = config2.getString("dataSourceConfig.zkQuorum");
                String string4 = config2.getString("dataSourceConfig.transactionZKRoot");
                EagleMetricCollectorMain.LOG.info(String.format("Use topic id: %s", string));
                String string5 = config2.hasPath("dataSourceConfig.brokerZkPath") ? config2.getString("dataSourceConfig.brokerZkPath") : null;
                SpoutConfig spoutConfig = new SpoutConfig(string5 == null ? new ZkHosts(string3) : new ZkHosts(string3, string5), string, string4 + "/" + string, string2);
                String[] split = string3.split(",");
                ArrayList arrayList = new ArrayList();
                for (String str : split) {
                    arrayList.add(str.split(":")[0]);
                }
                Integer valueOf = Integer.valueOf(split[0].split(":")[1]);
                spoutConfig.zkServers = arrayList;
                spoutConfig.zkPort = valueOf;
                spoutConfig.stateUpdateIntervalMs = config2.getLong("dataSourceConfig.transactionStateUpdateMS");
                spoutConfig.fetchSizeBytes = i;
                spoutConfig.scheme = new SchemeAsMultiScheme(kafkaSourcedSpoutScheme);
                return new KafkaSpout(spoutConfig);
            }
        };
        storm.fromSpout(new KafkaOffsetSourceSpoutProvider()).withOutputFields(0).nameAs("kafkaLogLagChecker");
        storm.fromSpout(kafkaSourcedSpoutProvider).withOutputFields(2).nameAs("kafkaMessageFetcher").groupBy(Arrays.asList(0)).flatMap(new KafkaMessageDistributionExecutor());
        storm.execute();
    }
}
