package com.alibaba.jstorm.metric;

import backtype.storm.generated.MetricInfo;
import backtype.storm.generated.TopologyMetric;
import backtype.storm.generated.WorkerUploadMetrics;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.OutputCollector;
import backtype.storm.tuple.Values;
import backtype.storm.utils.NimbusClientWrapper;
import com.alibaba.jstorm.callback.AsyncLoopThread;
import com.alibaba.jstorm.callback.RunnableCallback;
import com.alibaba.jstorm.client.ConfigExtension;
import com.alibaba.jstorm.cluster.Common;
import com.alibaba.jstorm.cluster.StormConfig;
import com.alibaba.jstorm.common.metric.AsmHistogram;
import com.alibaba.jstorm.common.metric.AsmMetric;
import com.alibaba.jstorm.config.Refreshable;
import com.alibaba.jstorm.config.RefreshableComponents;
import com.alibaba.jstorm.daemon.nimbus.NimbusData;
import com.alibaba.jstorm.daemon.supervisor.SupervisorManger;
import com.alibaba.jstorm.daemon.worker.WorkerData;
import com.alibaba.jstorm.task.execute.BoltCollector;
import com.alibaba.jstorm.task.execute.spout.SpoutCollector;
import com.alibaba.jstorm.utils.JStormServerUtils;
import com.alibaba.jstorm.utils.TimeUtils;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import shade.storm.com.google.common.annotations.VisibleForTesting;
import shade.storm.org.apache.commons.lang.StringUtils;

/* loaded from: input_file:com/alibaba/jstorm/metric/JStormMetricsReporter.class */
public class JStormMetricsReporter implements Refreshable {
    private static final Logger LOG = LoggerFactory.getLogger(JStormMetricsReporter.class);
    private Map conf;
    protected String clusterName;
    protected String topologyId;
    protected String host;
    protected int port;
    protected boolean localMode;
    private AsyncLoopThread checkMetricMetaThread;
    protected final int checkMetaThreadCycle;
    private AsyncLoopThread flushMetricThread;
    protected final int flushMetricThreadCycle;
    private boolean test;
    private boolean inTopology;
    private volatile SpoutOutputCollector spoutOutput;
    private volatile OutputCollector boltOutput;
    private NimbusClientWrapper client;
    private MetricsRegister metricsRegister;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/alibaba/jstorm/metric/JStormMetricsReporter$CheckMetricMetaThread.class */
    public class CheckMetricMetaThread extends RunnableCallback {
        private volatile boolean processing = false;
        private final long start = TimeUtils.current_time_secs();
        private final long initialDelay = 15 + new Random().nextInt(15);

        CheckMetricMetaThread() {
        }

        @Override // com.alibaba.jstorm.callback.RunnableCallback, java.lang.Runnable
        public void run() {
            if (!JStormMetrics.enabled || TimeUtils.current_time_secs() - this.start < this.initialDelay) {
                return;
            }
            if (this.processing) {
                JStormMetricsReporter.LOG.debug("still processing, skip...");
                return;
            }
            this.processing = true;
            try {
                HashSet hashSet = new HashSet();
                for (AsmMetricRegistry asmMetricRegistry : JStormMetrics.allRegistries) {
                    for (Map.Entry<String, AsmMetric> entry : asmMetricRegistry.getMetrics().entrySet()) {
                        AsmMetric value = entry.getValue();
                        if ((value.getOp() & 2) == 2 && !MetricUtils.isValidId(value.getMetricId())) {
                            hashSet.add(entry.getKey());
                        }
                    }
                }
                if (!JStormMetricsReporter.this.inTopology) {
                    for (Map.Entry<String, AsmMetric> entry2 : JStormMetrics.workerMetrics.getMetrics().entrySet()) {
                        if ((entry2.getValue().getOp() & 2) == 2) {
                            hashSet.add(entry2.getKey());
                        }
                    }
                }
                if (hashSet.size() > 0) {
                    JStormMetricsReporter.LOG.debug("register metrics, size:{}", Integer.valueOf(hashSet.size()));
                    if (!JStormMetricsReporter.this.inTopology) {
                        Map registerMetrics = JStormMetricsReporter.this.registerMetrics(hashSet);
                        if (registerMetrics != null) {
                            for (String str : registerMetrics.keySet()) {
                                AsmMetric find = JStormMetrics.find(str);
                                if (find != null) {
                                    long longValue = ((Long) registerMetrics.get(str)).longValue();
                                    find.setMetricId(longValue);
                                    JStormMetricsReporter.LOG.debug("set metric id, {}:{}", str, Long.valueOf(longValue));
                                }
                            }
                        }
                    } else if (JStormMetricsReporter.this.spoutOutput != null) {
                        ((SpoutCollector) JStormMetricsReporter.this.spoutOutput.getDelegate()).emitCtrl(Common.TOPOLOGY_MASTER_REGISTER_METRICS_STREAM_ID, new Values(hashSet), null);
                    } else if (JStormMetricsReporter.this.boltOutput != null) {
                        ((BoltCollector) JStormMetricsReporter.this.boltOutput.getDelegate()).emitCtrl(Common.TOPOLOGY_MASTER_REGISTER_METRICS_STREAM_ID, null, new Values(hashSet));
                    } else {
                        JStormMetricsReporter.LOG.warn("topology:{}, both spout and bolt collectors are null, don't know what to do...", JStormMetricsReporter.this.topologyId);
                    }
                }
            } catch (Throwable th) {
                JStormMetricsReporter.LOG.error("Error", th);
            }
            this.processing = false;
        }

        @Override // com.alibaba.jstorm.callback.RunnableCallback
        public Object getResult() {
            return Integer.valueOf(JStormMetricsReporter.this.checkMetaThreadCycle);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/alibaba/jstorm/metric/JStormMetricsReporter$FlushMetricThread.class */
    public class FlushMetricThread extends RunnableCallback {
        FlushMetricThread() {
        }

        @Override // com.alibaba.jstorm.callback.RunnableCallback, java.lang.Runnable
        public void run() {
            if (JStormMetrics.enabled && TimeUtils.isTimeAligned()) {
                int i = 0;
                try {
                    for (AsmMetricRegistry asmMetricRegistry : JStormMetrics.allRegistries) {
                        Iterator<Map.Entry<String, AsmMetric>> it = asmMetricRegistry.getMetrics().entrySet().iterator();
                        while (it.hasNext()) {
                            it.next().getValue().flush();
                            i++;
                        }
                    }
                    JStormMetricsReporter.LOG.debug("flush metrics, total:{}.", Integer.valueOf(i));
                    JStormMetricsReporter.this.uploadMetricData();
                } catch (Exception e) {
                    JStormMetricsReporter.LOG.error("Error", e);
                }
            }
        }

        @Override // com.alibaba.jstorm.callback.RunnableCallback
        public Object getResult() {
            return Integer.valueOf(JStormMetricsReporter.this.flushMetricThreadCycle);
        }
    }

    public JStormMetricsReporter(Object obj) {
        this.localMode = false;
        this.test = false;
        this.inTopology = false;
        this.client = null;
        LOG.info("starting jstorm metrics reporter in {}", obj.getClass().getSimpleName());
        if (obj instanceof WorkerData) {
            WorkerData workerData = (WorkerData) obj;
            this.conf = workerData.getStormConf();
            this.topologyId = (String) this.conf.get("topology.id");
            this.port = workerData.getPort().intValue();
            this.inTopology = true;
        } else if (obj instanceof NimbusData) {
            this.conf = ((NimbusData) obj).getConf();
            this.topologyId = JStormMetrics.NIMBUS_METRIC_KEY;
        } else if (obj instanceof SupervisorManger) {
            this.conf = ((SupervisorManger) obj).getConf();
            this.topologyId = JStormMetrics.SUPERVISOR_METRIC_KEY;
            JStormMetrics.setTopologyId(this.topologyId);
        }
        refresh(this.conf);
        this.metricsRegister = new MetricsRegister(this.conf, this.topologyId);
        this.host = JStormMetrics.getHost();
        if (JStormMetrics.enabled) {
            LOG.info("topology metrics is enabled.");
        } else {
            LOG.warn("***** topology metrics is disabled! *****");
        }
        this.checkMetaThreadCycle = 20;
        this.flushMetricThreadCycle = 1;
        LOG.info("check meta thread freq: {} sec, flush metrics thread freq: {} sec", Integer.valueOf(this.checkMetaThreadCycle), Integer.valueOf(this.flushMetricThreadCycle));
        this.localMode = StormConfig.local_mode(this.conf);
        this.clusterName = ConfigExtension.getClusterName(this.conf);
        RefreshableComponents.registerRefreshable(this);
        LOG.info("done.");
    }

    @VisibleForTesting
    JStormMetricsReporter() {
        this.localMode = false;
        this.test = false;
        this.inTopology = false;
        this.client = null;
        LOG.info("Successfully started jstorm metrics reporter for test.");
        this.test = true;
        this.flushMetricThreadCycle = 1;
        this.checkMetaThreadCycle = 20;
    }

    public void init() {
        if (JStormMetrics.enabled) {
            this.checkMetricMetaThread = new AsyncLoopThread(new CheckMetricMetaThread());
            this.flushMetricThread = new AsyncLoopThread(new FlushMetricThread());
        }
    }

    public void shutdown() {
        if (JStormMetrics.enabled) {
            this.checkMetricMetaThread.cleanup();
            this.flushMetricThread.cleanup();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Map<String, Long> registerMetrics(Set<String> set) {
        return this.test ? new HashMap() : this.metricsRegister.registerMetrics(set);
    }

    public void uploadMetricData() {
        if (this.test) {
            return;
        }
        try {
            long currentTimeMillis = System.currentTimeMillis();
            MetricInfo computeAllMetrics = MetricUtils.metricAccurateCal ? JStormMetrics.computeAllMetrics() : JStormMetrics.approximateComputeAllMetrics();
            WorkerUploadMetrics workerUploadMetrics = new WorkerUploadMetrics();
            workerUploadMetrics.set_topologyId(this.topologyId);
            workerUploadMetrics.set_supervisorId(this.host);
            workerUploadMetrics.set_port(this.port);
            workerUploadMetrics.set_allMetrics(computeAllMetrics);
            if (computeAllMetrics.get_metrics_size() > 0) {
                uploadMetricData(workerUploadMetrics);
                LOG.debug("Successfully upload worker metrics, size:{}, cost:{}", Integer.valueOf(computeAllMetrics.get_metrics_size()), Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
            } else {
                LOG.debug("No metrics to upload.");
            }
        } catch (Exception e) {
            LOG.error("Failed to upload worker metrics", e);
        }
    }

    public void uploadMetricData(WorkerUploadMetrics workerUploadMetrics) {
        if (this.inTopology) {
            if (this.boltOutput != null) {
                LOG.debug("emit metrics through bolt collector.");
                ((BoltCollector) this.boltOutput.getDelegate()).emitCtrl(Common.TOPOLOGY_MASTER_METRICS_STREAM_ID, null, new Values(JStormServerUtils.getName(this.host, this.port), workerUploadMetrics));
                return;
            } else if (this.spoutOutput == null) {
                LOG.warn("topology:{}, both spout/bolt collectors are null, don't know what to do...", this.topologyId);
                return;
            } else {
                LOG.debug("emit metrics through spout collector.");
                ((SpoutCollector) this.spoutOutput.getDelegate()).emitCtrl(Common.TOPOLOGY_MASTER_METRICS_STREAM_ID, new Values(JStormServerUtils.getName(this.host, this.port), workerUploadMetrics), null);
                return;
            }
        }
        LOG.debug("emit metrics through nimbus client.");
        TopologyMetric mkTopologyMetric = MetricUtils.mkTopologyMetric();
        mkTopologyMetric.set_workerMetric(workerUploadMetrics.get_allMetrics());
        try {
            if (this.client == null) {
                LOG.warn("nimbus client is null...");
                this.client = new NimbusClientWrapper();
                this.client.init(this.conf);
            }
            this.client.getClient().uploadTopologyMetrics(this.topologyId, mkTopologyMetric);
        } catch (Throwable th) {
            LOG.error("upload metrics error:", th);
            if (this.client != null) {
                this.client.cleanup();
                this.client = null;
            }
        }
    }

    public void setOutputCollector(Object obj) {
        if (obj instanceof OutputCollector) {
            this.boltOutput = (OutputCollector) obj;
        } else if (obj instanceof SpoutOutputCollector) {
            this.spoutOutput = (SpoutOutputCollector) obj;
        }
    }

    public void updateMetricConfig(Map map) {
        JStormMetrics.setDebug(ConfigExtension.isEnableMetricDebug(this.conf));
        JStormMetrics.addDebugMetrics(ConfigExtension.getDebugMetricNames(this.conf));
        boolean booleanValue = ConfigExtension.getTopologyAccurateMetric(map).booleanValue();
        if (MetricUtils.metricAccurateCal != booleanValue) {
            MetricUtils.metricAccurateCal = booleanValue;
            LOG.info("switch topology metric accurate enable to {}", Boolean.valueOf(MetricUtils.metricAccurateCal));
        }
        String enabledMetricNames = ConfigExtension.getEnabledMetricNames(map);
        String disabledMetricNames = ConfigExtension.getDisabledMetricNames(map);
        if (enabledMetricNames != null || disabledMetricNames != null) {
            Set<String> set = toSet(enabledMetricNames, ",");
            Set<String> set2 = toSet(disabledMetricNames, ",");
            for (AsmMetricRegistry asmMetricRegistry : new AsmMetricRegistry[]{JStormMetrics.getTopologyMetrics(), JStormMetrics.getComponentMetrics(), JStormMetrics.getTaskMetrics(), JStormMetrics.getStreamMetrics(), JStormMetrics.getNettyMetrics(), JStormMetrics.getWorkerMetrics()}) {
                for (AsmMetric asmMetric : asmMetricRegistry.getMetrics().values()) {
                    String shortName = asmMetric.getShortName();
                    if (set.contains(shortName)) {
                        asmMetric.setEnabled(true);
                    } else if (set2.contains(shortName)) {
                        asmMetric.setEnabled(false);
                    }
                }
            }
        }
        long timerUpdateInterval = ConfigExtension.getTimerUpdateInterval(map);
        if (timerUpdateInterval != AsmHistogram.getUpdateInterval()) {
            AsmHistogram.setUpdateInterval(timerUpdateInterval);
        }
        boolean isEnableStreamMetrics = ConfigExtension.isEnableStreamMetrics(map);
        if (isEnableStreamMetrics != JStormMetrics.enableStreamMetrics) {
            JStormMetrics.enableStreamMetrics = isEnableStreamMetrics;
            LOG.info("switch topology stream metric enable to {}", Boolean.valueOf(isEnableStreamMetrics));
        }
        boolean isEnableMetrics = ConfigExtension.isEnableMetrics(map);
        if (isEnableMetrics != JStormMetrics.enabled) {
            JStormMetrics.enabled = isEnableMetrics;
            LOG.info("switch topology metric enable to {}", Boolean.valueOf(isEnableMetrics));
        }
    }

    private Set<String> toSet(String str, String str2) {
        HashSet hashSet = new HashSet();
        if (!StringUtils.isBlank(str)) {
            for (String str3 : str.split(str2)) {
                String trim = str3.trim();
                if (!StringUtils.isBlank(trim)) {
                    hashSet.add(trim);
                }
            }
        }
        return hashSet;
    }

    @Override // com.alibaba.jstorm.config.Refreshable
    public void refresh(Map map) {
        updateMetricConfig(map);
    }

    public void updateMetricMeta(Map<String, Long> map) {
        if (map != null) {
            for (String str : map.keySet()) {
                AsmMetric find = JStormMetrics.find(str);
                if (find != null) {
                    long longValue = map.get(str).longValue();
                    find.setMetricId(longValue);
                    LOG.debug("set metric id, {}:{}", str, Long.valueOf(longValue));
                }
            }
        }
    }
}
