/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.jstorm.daemon.nimbus;

import backtype.storm.generated.MetricInfo;
import backtype.storm.generated.MetricSnapshot;
import backtype.storm.generated.TopologyMetric;
import backtype.storm.utils.Utils;
import com.alibaba.jstorm.callback.AsyncLoopThread;
import com.alibaba.jstorm.callback.RunnableCallback;
import com.alibaba.jstorm.client.ConfigExtension;
import com.alibaba.jstorm.cluster.Cluster;
import com.alibaba.jstorm.cluster.StormClusterState;
import com.alibaba.jstorm.common.metric.AsmGauge;
import com.alibaba.jstorm.common.metric.MetricMeta;
import com.alibaba.jstorm.daemon.nimbus.NimbusData;
import com.alibaba.jstorm.daemon.nimbus.NimbusUtils;
import com.alibaba.jstorm.daemon.nimbus.metric.uploader.DefaultMetricUploader;
import com.alibaba.jstorm.daemon.nimbus.metric.uploader.MetricUploader;
import com.alibaba.jstorm.metric.AsmWindow;
import com.alibaba.jstorm.metric.DefaultMetricIDGenerator;
import com.alibaba.jstorm.metric.DefaultMetricQueryClient;
import com.alibaba.jstorm.metric.JStormMetricCache;
import com.alibaba.jstorm.metric.JStormMetrics;
import com.alibaba.jstorm.metric.MetaType;
import com.alibaba.jstorm.metric.MetricIDGenerator;
import com.alibaba.jstorm.metric.MetricQueryClient;
import com.alibaba.jstorm.metric.MetricType;
import com.alibaba.jstorm.metric.MetricUtils;
import com.alibaba.jstorm.metric.SimpleJStormMetric;
import com.alibaba.jstorm.metric.TimeTicker;
import com.alibaba.jstorm.metric.TopologyMetricContext;
import com.alibaba.jstorm.schedule.Assignment;
import com.alibaba.jstorm.schedule.default_assign.ResourceWorkerSlot;
import com.alibaba.jstorm.utils.JStormUtils;
import com.alibaba.jstorm.utils.TimeUtils;
import com.codahale.metrics.Gauge;
import com.google.common.collect.Sets;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerArray;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.builder.ToStringBuilder;
import org.apache.commons.lang.builder.ToStringStyle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TopologyMetricsRunnable
extends Thread {
    private static final Logger LOG = LoggerFactory.getLogger(TopologyMetricsRunnable.class);
    protected JStormMetricCache metricCache;
    protected final ConcurrentMap<String, TopologyMetricContext> topologyMetricContexts = new ConcurrentHashMap<String, TopologyMetricContext>();
    protected final BlockingDeque<Event> queue = new LinkedBlockingDeque<Event>();
    private static final String PENDING_UPLOAD_METRIC_DATA = "__pending.upload.metrics__";
    private static final String PENDING_UPLOAD_METRIC_DATA_INFO = "__pending.upload.metrics.info__";
    private static final int UNSET = 0;
    private static final int SET = 1;
    private static final int UPLOADING = 2;
    private static final int PRE_SET = 3;
    protected final AtomicIntegerArray metricStat;
    protected StormClusterState stormClusterState;
    protected MetricUploader metricUploader;
    protected AtomicBoolean isShutdown;
    protected String clusterName;
    protected int maxPendingUploadMetrics;
    private final boolean localMode;
    private final NimbusData nimbusData;
    private MetricQueryClient metricQueryClient;
    private ScheduledExecutorService clusterMetricsUpdateExecutor;
    protected AsyncLoopThread refreshTopologiesThread;
    private final Thread uploadControlThread = new MetricsUploadThread();
    private final Thread flushMetricMetaThread = new FlushMetricMetaThread();
    private final MetricIDGenerator metricIDGenerator = new DefaultMetricIDGenerator();

    public TopologyMetricsRunnable(NimbusData nimbusData) {
        this.setName(this.getClass().getSimpleName());
        this.nimbusData = nimbusData;
        this.localMode = nimbusData.isLocalMode();
        if (this.localMode) {
            this.metricStat = new AtomicIntegerArray(1);
            return;
        }
        LOG.info("create topology metrics runnable.");
        this.metricCache = nimbusData.getMetricCache();
        this.stormClusterState = nimbusData.getStormClusterState();
        this.isShutdown = nimbusData.getIsShutdown();
        this.clusterName = ConfigExtension.getClusterName(nimbusData.getConf());
        if (this.clusterName == null) {
            throw new RuntimeException("cluster.name property must be set in storm.yaml!");
        }
        this.maxPendingUploadMetrics = ConfigExtension.getMaxPendingMetricNum(nimbusData.getConf());
        this.metricStat = new AtomicIntegerArray(this.maxPendingUploadMetrics);
        int cnt = 0;
        for (int i = 0; i < this.maxPendingUploadMetrics; ++i) {
            TopologyMetricDataInfo obj = this.getMetricDataInfoFromCache(i);
            if (obj == null) continue;
            this.metricStat.set(i, 1);
            ++cnt;
        }
        LOG.info("pending upload metrics: {}", (Object)cnt);
        this.refreshTopologies();
        this.refreshTopologiesThread = new AsyncLoopThread(new RefreshTopologiesThread());
        this.clusterMetricsUpdateExecutor = Executors.newSingleThreadScheduledExecutor();
        this.clusterMetricsUpdateExecutor.scheduleAtFixedRate(new Runnable(){

            @Override
            public void run() {
                int offset;
                int secOffset = TimeUtils.secOffset();
                if (secOffset < (offset = 55)) {
                    JStormUtils.sleepMs((offset - secOffset) * 1000);
                } else if (secOffset != offset) {
                    JStormUtils.sleepMs((60 - secOffset + offset) * 1000);
                }
                LOG.info("cluster metrics force upload.");
                TopologyMetricsRunnable.this.mergeAndUploadClusterMetrics();
            }
        }, 5L, 60L, TimeUnit.SECONDS);
        JStormMetrics.registerWorkerGauge("__NIMBUS__", "MemoryUsed", new AsmGauge(new Gauge<Double>(){

            public Double getValue() {
                return JStormUtils.getJVMHeapMemory();
            }
        }));
    }

    public void init() {
        if (this.localMode) {
            return;
        }
        String metricUploadClass = ConfigExtension.getMetricUploaderClass(this.nimbusData.getConf());
        if (StringUtils.isBlank((String)metricUploadClass)) {
            metricUploadClass = DefaultMetricUploader.class.getName();
        }
        LOG.info("metric uploader class:{}", (Object)metricUploadClass);
        Object instance = Utils.newInstance(metricUploadClass);
        if (!(instance instanceof MetricUploader)) {
            throw new RuntimeException(metricUploadClass + " isn't MetricUploader class ");
        }
        this.metricUploader = (MetricUploader)instance;
        try {
            this.metricUploader.init(this.nimbusData);
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
        LOG.info("Successfully init {}", (Object)metricUploadClass);
        String metricQueryClientClass = ConfigExtension.getMetricQueryClientClass(this.nimbusData.getConf());
        if (!StringUtils.isBlank((String)metricQueryClientClass)) {
            LOG.info("metric query client class:{}", (Object)metricQueryClientClass);
            this.metricQueryClient = (MetricQueryClient)Utils.newInstance(metricQueryClientClass);
        } else {
            LOG.warn("use default metric query client class.");
            this.metricQueryClient = new DefaultMetricQueryClient();
        }
        try {
            this.metricQueryClient.init(this.nimbusData.getConf());
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
        this.uploadControlThread.start();
        this.flushMetricMetaThread.start();
        LOG.info("init topology metric runnable done.");
    }

    public void shutdown() {
        LOG.info("Begin to shutdown");
        this.metricUploader.cleanup();
        LOG.info("Successfully shutdown");
    }

    @Override
    public void run() {
        while (this.isShutdown != null && !this.isShutdown.get()) {
            if (this.localMode) {
                return;
            }
            try {
                Event event;
                if (this.metricUploader == null || (event = this.queue.poll(1L, TimeUnit.MILLISECONDS)) == null) continue;
                if (event instanceof Remove) {
                    this.handleRemoveEvent((Remove)event);
                    continue;
                }
                if (event instanceof Update) {
                    this.handleUpdateEvent((Update)event);
                    continue;
                }
                if (event instanceof Refresh) {
                    this.handleRefreshEvent((Refresh)event);
                    continue;
                }
                if (event instanceof KillTopologyEvent) {
                    this.handleKillTopologyEvent((KillTopologyEvent)event);
                    continue;
                }
                if (event instanceof StartTopologyEvent) {
                    this.handleStartTopologyEvent((StartTopologyEvent)event);
                    continue;
                }
                if (event instanceof TaskDeadEvent) {
                    this.handleTaskDeadEvent((TaskDeadEvent)event);
                    continue;
                }
                if (event instanceof TaskStartEvent) {
                    this.handleTaskStartEvent((TaskStartEvent)event);
                    continue;
                }
                LOG.error("Unknown event type:{}", event.getClass());
            }
            catch (Exception e) {
                if (this.isShutdown.get()) continue;
                LOG.error(e.getMessage(), (Throwable)e);
            }
        }
    }

    public boolean isTopologyAlive(String topologyId) {
        return this.topologyMetricContexts.containsKey(topologyId);
    }

    private int getAndPresetFirstEmptyIndex() {
        for (int i = 0; i < this.maxPendingUploadMetrics; ++i) {
            if (this.metricStat.get(i) != 0 || !this.metricStat.compareAndSet(i, 0, 3)) continue;
            return i;
        }
        return -1;
    }

    private int getFirstPendingUploadIndex() {
        for (int i = 0; i < this.maxPendingUploadMetrics; ++i) {
            if (this.metricStat.get(i) != 1) continue;
            return i;
        }
        return -1;
    }

    public void markUploaded(int idx) {
        this.metricCache.remove(PENDING_UPLOAD_METRIC_DATA + idx);
        this.metricCache.remove(PENDING_UPLOAD_METRIC_DATA_INFO + idx);
        this.metricStat.set(idx, 0);
    }

    public void markUploading(int idx) {
        this.metricStat.set(idx, 2);
    }

    public void markSet(int idx) {
        this.metricStat.set(idx, 1);
    }

    public TopologyMetric getMetricDataFromCache(int idx) {
        return (TopologyMetric)this.metricCache.get(PENDING_UPLOAD_METRIC_DATA + idx);
    }

    public TopologyMetricDataInfo getMetricDataInfoFromCache(int idx) {
        return (TopologyMetricDataInfo)this.metricCache.get(PENDING_UPLOAD_METRIC_DATA_INFO + idx);
    }

    public void pushEvent(Event cmd) {
        this.queue.offer(cmd);
    }

    public Map<String, Long> registerMetrics(String topologyId, Set<String> metricNames) {
        TimeTicker ticker = new TimeTicker(TimeUnit.MILLISECONDS, true);
        ConcurrentMap<String, Long> memMeta = ((TopologyMetricContext)this.topologyMetricContexts.get(topologyId)).getMemMeta();
        HashMap<String, Long> ret = new HashMap<String, Long>();
        for (String metricName : metricNames) {
            Long id = (Long)memMeta.get(metricName);
            if (id != null && MetricUtils.isValidId(id)) {
                ret.put(metricName, id);
                continue;
            }
            id = this.metricIDGenerator.genMetricId(metricName);
            Long old = memMeta.putIfAbsent(metricName, id);
            if (old == null) {
                ret.put(metricName, id);
                continue;
            }
            ret.put(metricName, old);
        }
        long cost = ticker.stop();
        LOG.info("register metrics, topology:{}, size:{}, cost:{}", new Object[]{topologyId, metricNames.size(), cost});
        return ret;
    }

    public void handleRemoveEvent(Remove event) {
        String topologyId = event.topologyId;
        if (topologyId != null) {
            this.removeTopology(topologyId);
        }
        LOG.info("remove topology:{}.", (Object)topologyId);
    }

    private void removeTopology(String topologyId) {
        this.metricCache.removeTopology(topologyId);
        this.metricCache.removeSampleRate(topologyId);
        this.topologyMetricContexts.remove(topologyId);
    }

    public void refreshTopologies() {
        Map<String, Assignment> assignMap;
        for (String topology : JStormMetrics.SYS_TOPOLOGIES) {
            if (this.topologyMetricContexts.containsKey(topology)) continue;
            LOG.info("adding {} to metric context.", (Object)topology);
            HashMap<String, Double> conf = new HashMap<String, Double>();
            if (topology.equals("__CLUSTER__")) {
                conf.put("topology.metric.sample.rate", 1.0);
            }
            HashSet workerSlot = Sets.newHashSet((Object[])new ResourceWorkerSlot[]{new ResourceWorkerSlot()});
            TopologyMetricContext metricContext = new TopologyMetricContext(topology, workerSlot, conf);
            this.topologyMetricContexts.putIfAbsent(topology, metricContext);
            this.syncMetaFromCache(topology, (TopologyMetricContext)this.topologyMetricContexts.get(topology));
        }
        try {
            assignMap = Cluster.get_all_assignment(this.stormClusterState, null);
            for (String topologyId : assignMap.keySet()) {
                if (this.topologyMetricContexts.containsKey(topologyId)) continue;
                Assignment assignment = assignMap.get(topologyId);
                TopologyMetricContext metricContext = new TopologyMetricContext(assignment.getWorkers());
                metricContext.setTaskNum(NimbusUtils.getTopologyTaskNum(assignment));
                this.syncMetaFromCache(topologyId, metricContext);
                LOG.info("adding {} to metric context.", (Object)topologyId);
                this.topologyMetricContexts.put(topologyId, metricContext);
            }
        }
        catch (Exception e1) {
            LOG.warn("failed to get assignments");
            return;
        }
        ArrayList<String> removing = new ArrayList<String>();
        for (String topologyId : this.topologyMetricContexts.keySet()) {
            if (JStormMetrics.SYS_TOPOLOGY_SET.contains(topologyId) || assignMap.containsKey(topologyId)) continue;
            removing.add(topologyId);
        }
        for (String topologyId : removing) {
            LOG.info("removing topology:{}", (Object)topologyId);
            this.removeTopology(topologyId);
        }
    }

    public void syncTopologyMeta() {
        for (String topology : JStormMetrics.SYS_TOPOLOGIES) {
            if (!this.topologyMetricContexts.containsKey(topology)) continue;
            this.syncMetaFromRemote(topology, (TopologyMetricContext)this.topologyMetricContexts.get(topology));
        }
        try {
            Map<String, Assignment> assignMap = Cluster.get_all_assignment(this.stormClusterState, null);
            for (String topologyId : assignMap.keySet()) {
                if (!this.topologyMetricContexts.containsKey(topologyId)) continue;
                Assignment assignment = assignMap.get(topologyId);
                TopologyMetricContext metricContext = new TopologyMetricContext(assignment.getWorkers());
                metricContext.setTaskNum(NimbusUtils.getTopologyTaskNum(assignment));
                this.syncMetaFromCache(topologyId, metricContext);
                this.syncMetaFromRemote(topologyId, metricContext);
            }
        }
        catch (Exception e1) {
            LOG.warn("failed to get assignments");
        }
    }

    private void syncMetaFromCache(String topologyId, TopologyMetricContext context) {
        if (!context.syncMeta()) {
            Map<String, Long> meta = this.metricCache.getMeta(topologyId);
            if (meta != null) {
                context.getMemMeta().putAll(meta);
            }
            context.setSyncMeta(true);
        }
    }

    private void syncMetaFromRemote(String topologyId, TopologyMetricContext context) {
        try {
            int memSize = context.getMemMeta().size();
            Integer zkSize = (Integer)this.stormClusterState.get_topology_metric(topologyId);
            if (zkSize != null && memSize != zkSize) {
                ConcurrentMap<String, Long> memMeta = context.getMemMeta();
                for (MetaType metaType : MetaType.values()) {
                    List<MetricMeta> metaList = this.metricQueryClient.getMetricMeta(this.clusterName, topologyId, metaType);
                    if (metaList == null) continue;
                    LOG.info("get remote metric meta, topology:{}, metaType:{}, mem:{}, zk:{}, new size:{}", new Object[]{topologyId, metaType, memSize, zkSize, metaList.size()});
                    for (MetricMeta meta : metaList) {
                        memMeta.putIfAbsent(meta.getFQN(), meta.getId());
                    }
                }
                this.metricCache.putMeta(topologyId, memMeta);
            }
        }
        catch (Exception ex) {
            LOG.error("failed to sync remote meta", (Throwable)ex);
        }
    }

    protected void handleKillTopologyEvent(KillTopologyEvent event) {
        this.metricUploader.sendEvent(this.clusterName, event);
        this.removeTopology(event.topologyId);
    }

    private void handleStartTopologyEvent(StartTopologyEvent event) {
        this.metricCache.putSampleRate(event.topologyId, event.sampleRate);
        this.metricUploader.sendEvent(this.clusterName, event);
        if (!this.topologyMetricContexts.containsKey(event.topologyId)) {
            TopologyMetricContext metricContext = new TopologyMetricContext();
            this.topologyMetricContexts.put(event.topologyId, metricContext);
        }
    }

    private void handleTaskDeadEvent(TaskDeadEvent event) {
        this.metricUploader.sendEvent(this.clusterName, event);
        HashSet<ResourceWorkerSlot> workers = new HashSet<ResourceWorkerSlot>();
        workers.addAll(event.deadTasks.values());
        for (ResourceWorkerSlot worker : workers) {
            this.metricCache.unregisterWorker(event.topologyId, worker.getHostname(), worker.getPort());
        }
    }

    private void handleTaskStartEvent(TaskStartEvent event) {
        Assignment assignment = event.newAssignment;
        TopologyMetricContext metricContext = (TopologyMetricContext)this.topologyMetricContexts.get(event.topologyId);
        if (metricContext != null) {
            metricContext.setWorkerSet(assignment.getWorkers());
        } else {
            metricContext = new TopologyMetricContext();
            metricContext.setWorkerSet(assignment.getWorkers());
            this.topologyMetricContexts.put(event.topologyId, metricContext);
        }
        this.metricUploader.sendEvent(this.clusterName, event);
    }

    public void handleRefreshEvent(Refresh dummy) {
        TimeTicker ticker = new TimeTicker(TimeUnit.MILLISECONDS, true);
        try {
            this.refreshTopologies();
            LOG.info("refresh topologies, cost:{}", (Object)ticker.stopAndRestart());
            if (!this.nimbusData.isLeader()) {
                this.syncTopologyMeta();
                LOG.info("sync topology meta, cost:{}", (Object)ticker.stop());
            }
        }
        catch (Exception ex) {
            LOG.error("handleRefreshEvent error:", (Throwable)ex);
        }
    }

    private TopologyMetricContext getClusterTopologyMetricContext() {
        return (TopologyMetricContext)this.topologyMetricContexts.get("__CLUSTER__");
    }

    private void mergeAndUploadClusterMetrics() {
        TopologyMetricContext context = this.getClusterTopologyMetricContext();
        TopologyMetric tpMetric = context.mergeMetrics();
        if (tpMetric == null) {
            tpMetric = MetricUtils.mkTopologyMetric();
            tpMetric.set_topologyMetric(MetricUtils.mkMetricInfo());
        }
        MetricInfo clusterMetrics = tpMetric.get_topologyMetric();
        ConcurrentMap<String, Long> metricNames = context.getMemMeta();
        for (Map.Entry<String, Map<Integer, MetricSnapshot>> entry : clusterMetrics.get_metrics().entrySet()) {
            String metricName = entry.getKey();
            MetricType metricType = MetricUtils.metricType(metricName);
            Long metricId = (Long)metricNames.get(metricName);
            for (Map.Entry<Integer, MetricSnapshot> metric : entry.getValue().entrySet()) {
                MetricSnapshot snapshot = metric.getValue();
                snapshot.set_metricId(metricId);
                if (metricType != MetricType.HISTOGRAM) continue;
                snapshot.set_points(new byte[0]);
            }
        }
        long ts = System.currentTimeMillis();
        for (Map.Entry entry : metricNames.entrySet()) {
            String name = (String)entry.getKey();
            if (clusterMetrics.get_metrics().containsKey(name)) continue;
            HashMap<Integer, MetricSnapshot> metric = new HashMap<Integer, MetricSnapshot>();
            MetricType type = MetricUtils.metricType(name);
            metric.put(AsmWindow.M1_WINDOW, new MetricSnapshot((Long)entry.getValue(), ts, type.getT()));
            clusterMetrics.put_to_metrics(name, metric);
        }
        Update event = new Update();
        event.timestamp = System.currentTimeMillis();
        event.topologyMetrics = tpMetric;
        event.topologyId = "__CLUSTER__";
        this.pushEvent(event);
        LOG.info("send update event for cluster metrics, size : {}", (Object)clusterMetrics.get_metrics_size());
    }

    private void updateClusterMetrics(String topologyId, TopologyMetric tpMetric) {
        if (tpMetric.get_topologyMetric().get_metrics_size() > 0) {
            TopologyMetricContext context = this.getClusterTopologyMetricContext();
            MetricInfo topologyMetrics = tpMetric.get_topologyMetric();
            MetricInfo clusterMetrics = MetricUtils.mkMetricInfo();
            HashSet<String> metricNames = new HashSet<String>();
            for (Map.Entry<String, Map<Integer, MetricSnapshot>> entry : topologyMetrics.get_metrics().entrySet()) {
                String metricName = MetricUtils.topo2clusterName(entry.getKey());
                MetricType metricType = MetricUtils.metricType(metricName);
                HashMap<Integer, MetricSnapshot> winData = new HashMap<Integer, MetricSnapshot>();
                for (Map.Entry<Integer, MetricSnapshot> entryData : entry.getValue().entrySet()) {
                    MetricSnapshot snapshot = entryData.getValue().deepCopy();
                    winData.put(entryData.getKey(), snapshot);
                    if (metricType != MetricType.HISTOGRAM) continue;
                    entryData.getValue().set_points(new byte[0]);
                }
                clusterMetrics.put_to_metrics(metricName, winData);
                metricNames.add(metricName);
            }
            context.addToMemCache(topologyId, clusterMetrics);
            this.registerMetrics("__CLUSTER__", metricNames);
        }
    }

    public void handleUpdateEvent(Update event) {
        TopologyMetric topologyMetrics = event.topologyMetrics;
        String topologyId = event.topologyId;
        if (this.topologyMetricContexts.containsKey(topologyId)) {
            if (!"__CLUSTER__".equals(topologyId)) {
                this.updateClusterMetrics(topologyId, topologyMetrics);
            }
            this.metricCache.putMetricData(topologyId, topologyMetrics);
            int idx = this.getAndPresetFirstEmptyIndex();
            if (idx >= 0) {
                TopologyMetricDataInfo summary = new TopologyMetricDataInfo();
                int total = 0;
                summary.topologyId = topologyId;
                summary.timestamp = event.timestamp;
                if (topologyId.equals("__NIMBUS__") || topologyId.equals("__CLUSTER__")) {
                    summary.type = "TP";
                } else if ((total += topologyMetrics.get_topologyMetric().get_metrics_size() + topologyMetrics.get_componentMetric().get_metrics_size()) > 0) {
                    int sub = topologyMetrics.get_taskMetric().get_metrics_size() + topologyMetrics.get_workerMetric().get_metrics_size() + topologyMetrics.get_nettyMetric().get_metrics_size() + topologyMetrics.get_streamMetric().get_metrics_size();
                    if (sub > 0) {
                        total += sub;
                        summary.type = "ALL";
                    } else {
                        summary.type = "TP";
                    }
                } else {
                    summary.type = "TASK";
                    total += topologyMetrics.get_taskMetric().get_metrics_size();
                }
                this.metricCache.put(PENDING_UPLOAD_METRIC_DATA_INFO + idx, summary);
                this.metricCache.put(PENDING_UPLOAD_METRIC_DATA + idx, topologyMetrics);
                this.markSet(idx);
                LOG.info("put metric data to local cache, topology:{}, idx:{}, total:{}", new Object[]{topologyId, idx, total});
            } else {
                LOG.error("exceeding maxPendingUploadMetrics, skip caching metrics data for topology:{}", (Object)topologyId);
            }
        } else {
            LOG.warn("topology {} has been killed or has not started, skip update.", (Object)topologyId);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public TopologyMetric getTopologyMetric(String topologyId) {
        long start = System.nanoTime();
        try {
            TopologyMetric ret = new TopologyMetric();
            List<MetricInfo> topologyMetrics = this.metricCache.getMetricData(topologyId, MetaType.TOPOLOGY);
            List<MetricInfo> componentMetrics = this.metricCache.getMetricData(topologyId, MetaType.COMPONENT);
            List<MetricInfo> workerMetrics = this.metricCache.getMetricData(topologyId, MetaType.WORKER);
            MetricInfo dummy = MetricUtils.mkMetricInfo();
            if (topologyMetrics.size() > 0) {
                ret.set_topologyMetric(topologyMetrics.get(topologyMetrics.size() - 1));
            } else {
                ret.set_topologyMetric(dummy);
            }
            if (componentMetrics.size() > 0) {
                ret.set_componentMetric(componentMetrics.get(0));
            } else {
                ret.set_componentMetric(dummy);
            }
            if (workerMetrics.size() > 0) {
                ret.set_workerMetric(workerMetrics.get(0));
            } else {
                ret.set_workerMetric(dummy);
            }
            ret.set_taskMetric(dummy);
            ret.set_streamMetric(dummy);
            ret.set_nettyMetric(dummy);
            TopologyMetric topologyMetric = ret;
            return topologyMetric;
        }
        finally {
            long end = System.nanoTime();
            SimpleJStormMetric.updateNimbusHistogram("getTopologyMetric", (end - start) / 1000L);
        }
    }

    public static String getWorkerSlotName(String hostname, Integer port) {
        return hostname + ":" + port;
    }

    public static class TaskStartEvent
    extends Event {
        public Assignment oldAssignment;
        public Assignment newAssignment;
        public Map<Integer, String> task2Component;
    }

    public static class TaskDeadEvent
    extends Event {
        public Map<Integer, ResourceWorkerSlot> deadTasks;
    }

    public static class StartTopologyEvent
    extends Event {
        public double sampleRate;
    }

    public static class KillTopologyEvent
    extends Event {
    }

    public static class Refresh
    extends Event {
    }

    public static class Remove
    extends Event {
    }

    public static class Update
    extends Event {
        public TopologyMetric topologyMetrics;
    }

    public static class Event {
        public String clusterName;
        public String topologyId;
        public long timestamp;

        protected Event() {
        }

        public String toString() {
            return ToStringBuilder.reflectionToString((Object)this, (ToStringStyle)ToStringStyle.SHORT_PREFIX_STYLE);
        }
    }

    public static class TopologyMetricDataInfo
    implements Serializable {
        private static final long serialVersionUID = 1303262512351757610L;
        public String topologyId;
        public String type;
        public long timestamp;

        public Map<String, Object> toMap() {
            HashMap<String, Object> ret = new HashMap<String, Object>();
            ret.put("metric.timestamp", this.timestamp);
            ret.put("metric.type", this.type);
            return ret;
        }

        public String toString() {
            return ToStringBuilder.reflectionToString((Object)this, (ToStringStyle)ToStringStyle.SHORT_PREFIX_STYLE);
        }
    }

    class FlushMetricMetaThread
    extends Thread {
        public FlushMetricMetaThread() {
            this.setName("FlushMetricMetaThread");
        }

        @Override
        public void run() {
            while (TopologyMetricsRunnable.this.isShutdown != null && !TopologyMetricsRunnable.this.isShutdown.get()) {
                long start = System.currentTimeMillis();
                try {
                    if (TopologyMetricsRunnable.this.nimbusData.isLeader() && TopologyMetricsRunnable.this.metricUploader != null) {
                        for (Map.Entry entry : TopologyMetricsRunnable.this.topologyMetricContexts.entrySet()) {
                            ConcurrentMap<String, Long> memMeta;
                            String topologyId = (String)entry.getKey();
                            TopologyMetricContext metricContext = (TopologyMetricContext)entry.getValue();
                            Map<String, Long> cachedMeta = TopologyMetricsRunnable.this.metricCache.getMeta(topologyId);
                            if (cachedMeta == null) {
                                cachedMeta = new HashMap<String, Long>();
                            }
                            if ((memMeta = metricContext.getMemMeta()).size() > cachedMeta.size()) {
                                cachedMeta.putAll(memMeta);
                            }
                            TopologyMetricsRunnable.this.metricCache.putMeta(topologyId, cachedMeta);
                            int curSize = cachedMeta.size();
                            if (curSize != metricContext.getFlushedMetaNum()) {
                                metricContext.setFlushedMetaNum(curSize);
                                TopologyMetricsRunnable.this.metricUploader.registerMetrics(TopologyMetricsRunnable.this.clusterName, topologyId, cachedMeta);
                                LOG.info("flush metric meta, topology:{}, total:{}, cost:{}.", new Object[]{topologyId, curSize, System.currentTimeMillis() - start});
                            }
                            TopologyMetricsRunnable.this.stormClusterState.set_topology_metric(topologyId, curSize);
                        }
                    }
                    JStormUtils.sleepMs(15000L);
                }
                catch (Exception ex) {
                    LOG.error("Error", (Throwable)ex);
                }
            }
        }
    }

    class MetricsUploadThread
    extends Thread {
        public MetricsUploadThread() {
            this.setName("main-upload-control-thread");
        }

        @Override
        public void run() {
            while (TopologyMetricsRunnable.this.isShutdown != null && !TopologyMetricsRunnable.this.isShutdown.get()) {
                try {
                    int idx;
                    if (TopologyMetricsRunnable.this.metricUploader != null && TopologyMetricsRunnable.this.nimbusData.isLeader() && (idx = TopologyMetricsRunnable.this.getFirstPendingUploadIndex()) >= 0) {
                        TopologyMetricsRunnable.this.markUploading(idx);
                        this.upload(TopologyMetricsRunnable.this.clusterName, idx);
                    }
                    JStormUtils.sleepMs(5L);
                }
                catch (Exception ex) {
                    LOG.error("Error", (Throwable)ex);
                }
            }
        }

        public boolean upload(String clusterName, int idx) {
            TopologyMetricDataInfo summary = TopologyMetricsRunnable.this.getMetricDataInfoFromCache(idx);
            if (summary == null) {
                LOG.warn("metric summary is null from cache idx:{}", (Object)idx);
                TopologyMetricsRunnable.this.markUploaded(idx);
                return true;
            }
            String topologyId = summary.topologyId;
            if (!TopologyMetricsRunnable.this.isTopologyAlive(topologyId)) {
                LOG.warn("topology {} is not alive, skip sending metrics.", (Object)topologyId);
                TopologyMetricsRunnable.this.markUploaded(idx);
                return true;
            }
            return TopologyMetricsRunnable.this.metricUploader.upload(clusterName, topologyId, idx, summary.toMap());
        }
    }

    class RefreshTopologiesThread
    extends RunnableCallback {
        RefreshTopologiesThread() {
        }

        @Override
        public void run() {
            if (TopologyMetricsRunnable.this.isShutdown != null && !TopologyMetricsRunnable.this.isShutdown.get()) {
                TopologyMetricsRunnable.this.pushEvent(new Refresh());
            }
        }

        @Override
        public Object getResult() {
            return 60;
        }

        @Override
        public String getThreadName() {
            return "RefreshThread";
        }
    }
}

