/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.jstorm.metric;

import backtype.storm.generated.MetricInfo;
import backtype.storm.generated.TopologyMetric;
import backtype.storm.generated.WorkerUploadMetrics;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.OutputCollector;
import backtype.storm.tuple.Values;
import backtype.storm.utils.NimbusClient;
import com.alibaba.jstorm.callback.AsyncLoopThread;
import com.alibaba.jstorm.callback.RunnableCallback;
import com.alibaba.jstorm.client.ConfigExtension;
import com.alibaba.jstorm.cluster.StormConfig;
import com.alibaba.jstorm.common.metric.AsmHistogram;
import com.alibaba.jstorm.common.metric.AsmMetric;
import com.alibaba.jstorm.daemon.nimbus.NimbusData;
import com.alibaba.jstorm.daemon.nimbus.TopologyMetricsRunnable;
import com.alibaba.jstorm.daemon.supervisor.SupervisorManger;
import com.alibaba.jstorm.daemon.worker.WorkerData;
import com.alibaba.jstorm.metric.AsmMetricRegistry;
import com.alibaba.jstorm.metric.JStormMetrics;
import com.alibaba.jstorm.metric.MetricUtils;
import com.alibaba.jstorm.task.execute.BoltCollector;
import com.alibaba.jstorm.task.execute.spout.SpoutCollector;
import com.alibaba.jstorm.utils.JStormServerUtils;
import com.alibaba.jstorm.utils.TimeUtils;
import com.google.common.annotations.VisibleForTesting;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JStormMetricsReporter {
    private static final Logger LOG = LoggerFactory.getLogger(JStormMetricsReporter.class);
    private Map conf;
    protected String clusterName;
    protected String topologyId;
    protected String host;
    protected int port;
    protected boolean localMode = false;
    private AsyncLoopThread checkMetricMetaThread;
    protected final int checkMetaThreadCycle;
    private AsyncLoopThread flushMetricThread;
    protected final int flushMetricThreadCycle;
    private boolean test = false;
    private boolean isInWorker = false;
    private SpoutOutputCollector spoutOutput;
    private OutputCollector boltOutput;
    private NimbusClient client = null;

    public JStormMetricsReporter(Object role) {
        LOG.info("starting jstorm metrics reporter in {}", (Object)role.getClass().getSimpleName());
        if (role instanceof WorkerData) {
            WorkerData workerData = (WorkerData)role;
            this.conf = workerData.getStormConf();
            this.topologyId = (String)this.conf.get("topology.id");
            this.port = workerData.getPort();
            this.isInWorker = true;
        } else if (role instanceof NimbusData) {
            NimbusData nimbusData = (NimbusData)role;
            this.conf = nimbusData.getConf();
            this.topologyId = "__NIMBUS__";
        } else if (role instanceof SupervisorManger) {
            SupervisorManger supervisor = (SupervisorManger)role;
            this.conf = supervisor.getConf();
            this.topologyId = "__SUPERVISOR__";
            JStormMetrics.setTopologyId(this.topologyId);
        }
        JStormMetrics.enabled = ConfigExtension.isEnableMetrics(this.conf);
        JStormMetrics.setDebug(ConfigExtension.isEnableMetricDebug(this.conf));
        JStormMetrics.addDebugMetrics(ConfigExtension.getDebugMetricNames(this.conf));
        JStormMetrics.setTimerUpdateInterval(ConfigExtension.getTimerUpdateInterval(this.conf));
        this.host = JStormMetrics.getHost();
        if (!JStormMetrics.enabled) {
            LOG.warn("***** topology metrics is disabled! *****");
        } else {
            LOG.info("topology metrics is enabled.");
        }
        this.checkMetaThreadCycle = 30;
        this.flushMetricThreadCycle = 1;
        LOG.info("check meta thread freq: {} sec, flush metrics thread freq: {} sec", (Object)this.checkMetaThreadCycle, (Object)this.flushMetricThreadCycle);
        this.localMode = StormConfig.local_mode(this.conf);
        this.clusterName = ConfigExtension.getClusterName(this.conf);
        LOG.info("done.");
    }

    @VisibleForTesting
    JStormMetricsReporter() {
        LOG.info("Successfully started jstorm metrics reporter for test.");
        this.test = true;
        this.flushMetricThreadCycle = 1;
        this.checkMetaThreadCycle = 30;
    }

    public void init() {
        if (!this.localMode && JStormMetrics.enabled) {
            this.checkMetricMetaThread = new AsyncLoopThread(new CheckMetricMetaThread());
            this.flushMetricThread = new AsyncLoopThread(new FlushMetricThread());
        }
    }

    private Map<String, Long> registerMetrics(Set<String> names) {
        if (this.test || !JStormMetrics.enabled) {
            return new HashMap<String, Long>();
        }
        try {
            if (this.client == null) {
                this.client = NimbusClient.getConfiguredClient(this.conf);
            }
            return this.client.getClient().registerMetrics(this.topologyId, names);
        }
        catch (Exception e) {
            LOG.error("Failed to gen metric ids", (Throwable)e);
            if (this.client != null) {
                this.client.close();
                this.client = NimbusClient.getConfiguredClient(this.conf);
            }
            return null;
        }
    }

    public void shutdown() {
        if (!this.localMode && JStormMetrics.enabled) {
            this.checkMetricMetaThread.cleanup();
            this.flushMetricThread.cleanup();
        }
    }

    public void doUpload() {
        if (this.test) {
            return;
        }
        try {
            long start = System.currentTimeMillis();
            MetricInfo workerMetricInfo = JStormMetrics.computeAllMetrics();
            WorkerUploadMetrics upload = new WorkerUploadMetrics();
            upload.set_topologyId(this.topologyId);
            upload.set_supervisorId(this.host);
            upload.set_port(this.port);
            upload.set_allMetrics(workerMetricInfo);
            if (workerMetricInfo.get_metrics_size() > 0) {
                this.uploadMetric(upload);
                LOG.info("Successfully upload worker metrics, size:{}, cost:{}", (Object)workerMetricInfo.get_metrics_size(), (Object)(System.currentTimeMillis() - start));
            } else {
                LOG.info("No metrics to upload.");
            }
        }
        catch (Exception e) {
            LOG.error("Failed to upload worker metrics", (Throwable)e);
        }
    }

    public void uploadMetric(WorkerUploadMetrics metrics) {
        block8: {
            if (this.isInWorker) {
                if (this.boltOutput != null) {
                    LOG.info("emit metrics through bolt collector.");
                    ((BoltCollector)this.boltOutput.getDelegate()).emitCtrl("__master_metrics", null, new Values(JStormServerUtils.getName(this.host, this.port), metrics));
                } else if (this.spoutOutput != null) {
                    LOG.info("emit metrics through spout collector.");
                    ((SpoutCollector)this.spoutOutput.getDelegate()).emitCtrl("__master_metrics", new Values(JStormServerUtils.getName(this.host, this.port), metrics), null);
                }
            } else {
                LOG.info("emit metrics through nimbus client.");
                TopologyMetricsRunnable.Update event = new TopologyMetricsRunnable.Update();
                TopologyMetric tpMetric = MetricUtils.mkTopologyMetric();
                tpMetric.set_workerMetric(metrics.get_allMetrics());
                event.topologyMetrics = tpMetric;
                event.topologyId = this.topologyId;
                try {
                    if (this.client == null) {
                        this.client = NimbusClient.getConfiguredClient(this.conf);
                    }
                    this.client.getClient().uploadTopologyMetrics(this.topologyId, tpMetric);
                }
                catch (Exception ex) {
                    LOG.error("upload metric error:", (Throwable)ex);
                    if (this.client == null) break block8;
                    this.client.close();
                    this.client = NimbusClient.getConfiguredClient(this.conf);
                }
            }
        }
    }

    public void setOutputCollector(Object outputCollector) {
        if (outputCollector instanceof OutputCollector) {
            this.boltOutput = (OutputCollector)outputCollector;
        } else if (outputCollector instanceof SpoutOutputCollector) {
            this.spoutOutput = (SpoutOutputCollector)outputCollector;
        }
    }

    public void updateMetricConfig(Map newConf) {
        boolean enableMetrics;
        boolean enableStreamMetrics;
        long updateInterval;
        String enabledMetrics = ConfigExtension.getEnabledMetricNames(newConf);
        String disabledMetrics = ConfigExtension.getDisabledMetricNames(newConf);
        if (enabledMetrics != null || disabledMetrics != null) {
            AsmMetricRegistry[] registries;
            Set<String> enabledMetricSet = this.toSet(enabledMetrics, ",");
            Set<String> disabledMetricsSet = this.toSet(disabledMetrics, ",");
            for (AsmMetricRegistry registry : registries = new AsmMetricRegistry[]{JStormMetrics.getTopologyMetrics(), JStormMetrics.getComponentMetrics(), JStormMetrics.getTaskMetrics(), JStormMetrics.getStreamMetrics(), JStormMetrics.getNettyMetrics(), JStormMetrics.getWorkerMetrics()}) {
                Collection<AsmMetric> metrics = registry.getMetrics().values();
                for (AsmMetric metric : metrics) {
                    String metricName = metric.getMetricName();
                    if (enabledMetricSet.contains(metricName)) {
                        metric.setEnabled(true);
                        continue;
                    }
                    if (!disabledMetricsSet.contains(metricName)) continue;
                    metric.setEnabled(false);
                }
            }
        }
        if ((updateInterval = ConfigExtension.getTimerUpdateInterval(newConf)) != AsmHistogram.getUpdateInterval()) {
            AsmHistogram.setUpdateInterval(updateInterval);
        }
        if ((enableStreamMetrics = ConfigExtension.isEnableStreamMetrics(newConf)) != JStormMetrics.enableStreamMetrics) {
            JStormMetrics.enableStreamMetrics = enableStreamMetrics;
            LOG.info("switch topology stream metric enable to {}", (Object)enableStreamMetrics);
        }
        if ((enableMetrics = ConfigExtension.isEnableMetrics(newConf)) != JStormMetrics.enabled) {
            JStormMetrics.enabled = enableMetrics;
            if (enableMetrics) {
                this.init();
            } else {
                this.shutdown();
            }
            LOG.info("switch topology metric enable to {}", (Object)enableMetrics);
        }
    }

    private Set<String> toSet(String items, String delim) {
        HashSet<String> ret = new HashSet<String>();
        if (!StringUtils.isBlank((String)items)) {
            String[] metrics;
            for (String metric : metrics = items.split(delim)) {
                if (StringUtils.isBlank((String)metric)) continue;
                ret.add(metric);
            }
        }
        return ret;
    }

    class CheckMetricMetaThread
    extends RunnableCallback {
        private volatile boolean processing = false;
        private final long start = TimeUtils.current_time_secs();
        private final long initialDelay = 30 + new Random().nextInt(15);

        CheckMetricMetaThread() {
        }

        @Override
        public void run() {
            if ((long)TimeUtils.current_time_secs() - this.start < this.initialDelay) {
                return;
            }
            if (this.processing) {
                LOG.info("still processing, skip...");
            } else {
                this.processing = true;
                long start = System.currentTimeMillis();
                try {
                    HashSet<String> names = new HashSet<String>();
                    for (AsmMetricRegistry registry : JStormMetrics.allRegistries) {
                        Map<String, AsmMetric> metricMap = registry.getMetrics();
                        for (Map.Entry<String, AsmMetric> metricEntry : metricMap.entrySet()) {
                            AsmMetric metric = metricEntry.getValue();
                            if ((metric.getOp() & 2) != 2 || metric.getMetricId() != 0L) continue;
                            names.add(metricEntry.getKey());
                        }
                    }
                    if (names.size() > 0) {
                        Map nameIdMap = JStormMetricsReporter.this.registerMetrics(names);
                        if (nameIdMap != null) {
                            for (String name : nameIdMap.keySet()) {
                                AsmMetric metric = JStormMetrics.find(name);
                                if (metric == null) continue;
                                long id = (Long)nameIdMap.get(name);
                                metric.setMetricId(id);
                                LOG.info("set metric id, {}:{}", (Object)name, (Object)id);
                            }
                        }
                        LOG.info("register metrics, size:{}, cost:{}", (Object)names.size(), (Object)(System.currentTimeMillis() - start));
                    }
                }
                catch (Exception ex) {
                    LOG.error("Error", (Throwable)ex);
                }
                this.processing = false;
            }
        }

        @Override
        public Object getResult() {
            return JStormMetricsReporter.this.checkMetaThreadCycle;
        }
    }

    class FlushMetricThread
    extends RunnableCallback {
        FlushMetricThread() {
        }

        @Override
        public void run() {
            if (TimeUtils.isTimeAligned()) {
                int cnt = 0;
                try {
                    for (AsmMetricRegistry registry : JStormMetrics.allRegistries) {
                        for (Map.Entry<String, AsmMetric> entry : registry.getMetrics().entrySet()) {
                            entry.getValue().flush();
                            ++cnt;
                        }
                    }
                    LOG.info("flush metrics, total:{}.", (Object)cnt);
                    JStormMetricsReporter.this.doUpload();
                }
                catch (Exception ex) {
                    LOG.error("Error", (Throwable)ex);
                }
            }
        }

        @Override
        public Object getResult() {
            return JStormMetricsReporter.this.flushMetricThreadCycle;
        }
    }
}

