package com.alibaba.jstorm.daemon.nimbus;

import backtype.storm.generated.MetricInfo;
import backtype.storm.generated.MetricSnapshot;
import backtype.storm.generated.TopologyMetric;
import backtype.storm.utils.Utils;
import com.alibaba.jstorm.callback.AsyncLoopThread;
import com.alibaba.jstorm.callback.RunnableCallback;
import com.alibaba.jstorm.client.ConfigExtension;
import com.alibaba.jstorm.cluster.Cluster;
import com.alibaba.jstorm.cluster.StormClusterState;
import com.alibaba.jstorm.common.metric.AsmGauge;
import com.alibaba.jstorm.common.metric.MetricMeta;
import com.alibaba.jstorm.daemon.nimbus.metric.uploader.DefaultMetricUploader;
import com.alibaba.jstorm.daemon.nimbus.metric.uploader.MetricUploader;
import com.alibaba.jstorm.metric.AsmWindow;
import com.alibaba.jstorm.metric.DefaultMetricIDGenerator;
import com.alibaba.jstorm.metric.DefaultMetricQueryClient;
import com.alibaba.jstorm.metric.JStormMetricCache;
import com.alibaba.jstorm.metric.JStormMetrics;
import com.alibaba.jstorm.metric.MetaType;
import com.alibaba.jstorm.metric.MetricDef;
import com.alibaba.jstorm.metric.MetricIDGenerator;
import com.alibaba.jstorm.metric.MetricQueryClient;
import com.alibaba.jstorm.metric.MetricType;
import com.alibaba.jstorm.metric.MetricUtils;
import com.alibaba.jstorm.metric.SimpleJStormMetric;
import com.alibaba.jstorm.metric.TimeTicker;
import com.alibaba.jstorm.metric.TopologyMetricContext;
import com.alibaba.jstorm.schedule.Assignment;
import com.alibaba.jstorm.schedule.default_assign.ResourceWorkerSlot;
import com.alibaba.jstorm.utils.JStormUtils;
import com.alibaba.jstorm.utils.TimeUtils;
import com.codahale.metrics.Gauge;
import com.google.common.collect.Sets;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerArray;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.builder.ToStringBuilder;
import org.apache.commons.lang.builder.ToStringStyle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alibaba/jstorm/daemon/nimbus/TopologyMetricsRunnable.class */
public class TopologyMetricsRunnable extends Thread {
    private static final Logger LOG = LoggerFactory.getLogger(TopologyMetricsRunnable.class);
    protected JStormMetricCache metricCache;
    private static final String PENDING_UPLOAD_METRIC_DATA = "__pending.upload.metrics__";
    private static final String PENDING_UPLOAD_METRIC_DATA_INFO = "__pending.upload.metrics.info__";
    private static final int UNSET = 0;
    private static final int SET = 1;
    private static final int UPLOADING = 2;
    private static final int PRE_SET = 3;
    protected final AtomicIntegerArray metricStat;
    protected StormClusterState stormClusterState;
    protected MetricUploader metricUploader;
    protected AtomicBoolean isShutdown;
    protected String clusterName;
    protected int maxPendingUploadMetrics;
    private final boolean localMode;
    private final NimbusData nimbusData;
    private MetricQueryClient metricQueryClient;
    private ScheduledExecutorService clusterMetricsUpdateExecutor;
    protected AsyncLoopThread refreshTopologiesThread;
    protected final ConcurrentMap<String, TopologyMetricContext> topologyMetricContexts = new ConcurrentHashMap();
    protected final BlockingDeque<Event> queue = new LinkedBlockingDeque();
    private final Thread uploadControlThread = new MetricsUploadThread();
    private final Thread flushMetricMetaThread = new FlushMetricMetaThread();
    private final MetricIDGenerator metricIDGenerator = new DefaultMetricIDGenerator();

    /* loaded from: input_file:com/alibaba/jstorm/daemon/nimbus/TopologyMetricsRunnable$Event.class */
    public static class Event {
        public String clusterName;
        public String topologyId;
        public long timestamp;

        protected Event() {
        }

        public String toString() {
            return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
        }
    }

    /* loaded from: input_file:com/alibaba/jstorm/daemon/nimbus/TopologyMetricsRunnable$FlushMetricMetaThread.class */
    class FlushMetricMetaThread extends Thread {
        public FlushMetricMetaThread() {
            setName("FlushMetricMetaThread");
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (TopologyMetricsRunnable.this.isShutdown != null && !TopologyMetricsRunnable.this.isShutdown.get()) {
                long currentTimeMillis = System.currentTimeMillis();
                try {
                    if (TopologyMetricsRunnable.this.nimbusData.isLeader() && TopologyMetricsRunnable.this.metricUploader != null) {
                        for (Map.Entry<String, TopologyMetricContext> entry : TopologyMetricsRunnable.this.topologyMetricContexts.entrySet()) {
                            String key = entry.getKey();
                            TopologyMetricContext value = entry.getValue();
                            Map<String, Long> meta = TopologyMetricsRunnable.this.metricCache.getMeta(key);
                            if (meta == null) {
                                meta = new HashMap();
                            }
                            ConcurrentMap<String, Long> memMeta = value.getMemMeta();
                            if (memMeta.size() > meta.size()) {
                                meta.putAll(memMeta);
                            }
                            TopologyMetricsRunnable.this.metricCache.putMeta(key, meta);
                            int size = meta.size();
                            if (size != value.getFlushedMetaNum()) {
                                value.setFlushedMetaNum(size);
                                TopologyMetricsRunnable.this.metricUploader.registerMetrics(TopologyMetricsRunnable.this.clusterName, key, meta);
                                TopologyMetricsRunnable.LOG.info("flush metric meta, topology:{}, total:{}, cost:{}.", new Object[]{key, Integer.valueOf(size), Long.valueOf(System.currentTimeMillis() - currentTimeMillis)});
                            }
                            TopologyMetricsRunnable.this.stormClusterState.set_topology_metric(key, Integer.valueOf(size));
                        }
                    }
                    JStormUtils.sleepMs(15000L);
                } catch (Exception e) {
                    TopologyMetricsRunnable.LOG.error("Error", e);
                }
            }
        }
    }

    /* loaded from: input_file:com/alibaba/jstorm/daemon/nimbus/TopologyMetricsRunnable$KillTopologyEvent.class */
    public static class KillTopologyEvent extends Event {
    }

    /* loaded from: input_file:com/alibaba/jstorm/daemon/nimbus/TopologyMetricsRunnable$MetricsUploadThread.class */
    class MetricsUploadThread extends Thread {
        public MetricsUploadThread() {
            setName("main-upload-control-thread");
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            int firstPendingUploadIndex;
            while (TopologyMetricsRunnable.this.isShutdown != null && !TopologyMetricsRunnable.this.isShutdown.get()) {
                try {
                    if (TopologyMetricsRunnable.this.metricUploader != null && TopologyMetricsRunnable.this.nimbusData.isLeader() && (firstPendingUploadIndex = TopologyMetricsRunnable.this.getFirstPendingUploadIndex()) >= 0) {
                        TopologyMetricsRunnable.this.markUploading(firstPendingUploadIndex);
                        upload(TopologyMetricsRunnable.this.clusterName, firstPendingUploadIndex);
                    }
                    JStormUtils.sleepMs(5L);
                } catch (Exception e) {
                    TopologyMetricsRunnable.LOG.error("Error", e);
                }
            }
        }

        public boolean upload(String str, int i) {
            TopologyMetricDataInfo metricDataInfoFromCache = TopologyMetricsRunnable.this.getMetricDataInfoFromCache(i);
            if (metricDataInfoFromCache == null) {
                TopologyMetricsRunnable.LOG.warn("metric summary is null from cache idx:{}", Integer.valueOf(i));
                TopologyMetricsRunnable.this.markUploaded(i);
                return true;
            }
            String str2 = metricDataInfoFromCache.topologyId;
            if (TopologyMetricsRunnable.this.isTopologyAlive(str2)) {
                return TopologyMetricsRunnable.this.metricUploader.upload(str, str2, Integer.valueOf(i), metricDataInfoFromCache.toMap());
            }
            TopologyMetricsRunnable.LOG.warn("topology {} is not alive, skip sending metrics.", str2);
            TopologyMetricsRunnable.this.markUploaded(i);
            return true;
        }
    }

    /* loaded from: input_file:com/alibaba/jstorm/daemon/nimbus/TopologyMetricsRunnable$Refresh.class */
    public static class Refresh extends Event {
    }

    /* loaded from: input_file:com/alibaba/jstorm/daemon/nimbus/TopologyMetricsRunnable$RefreshTopologiesThread.class */
    class RefreshTopologiesThread extends RunnableCallback {
        RefreshTopologiesThread() {
        }

        @Override // com.alibaba.jstorm.callback.RunnableCallback, java.lang.Runnable
        public void run() {
            if (TopologyMetricsRunnable.this.isShutdown == null || TopologyMetricsRunnable.this.isShutdown.get()) {
                return;
            }
            TopologyMetricsRunnable.this.pushEvent(new Refresh());
        }

        @Override // com.alibaba.jstorm.callback.RunnableCallback
        public Object getResult() {
            return 60;
        }

        @Override // com.alibaba.jstorm.callback.RunnableCallback
        public String getThreadName() {
            return "RefreshThread";
        }
    }

    /* loaded from: input_file:com/alibaba/jstorm/daemon/nimbus/TopologyMetricsRunnable$Remove.class */
    public static class Remove extends Event {
    }

    /* loaded from: input_file:com/alibaba/jstorm/daemon/nimbus/TopologyMetricsRunnable$StartTopologyEvent.class */
    public static class StartTopologyEvent extends Event {
        public double sampleRate;
    }

    /* loaded from: input_file:com/alibaba/jstorm/daemon/nimbus/TopologyMetricsRunnable$TaskDeadEvent.class */
    public static class TaskDeadEvent extends Event {
        public Map<Integer, ResourceWorkerSlot> deadTasks;
    }

    /* loaded from: input_file:com/alibaba/jstorm/daemon/nimbus/TopologyMetricsRunnable$TaskStartEvent.class */
    public static class TaskStartEvent extends Event {
        public Assignment oldAssignment;
        public Assignment newAssignment;
        public Map<Integer, String> task2Component;
    }

    /* loaded from: input_file:com/alibaba/jstorm/daemon/nimbus/TopologyMetricsRunnable$TopologyMetricDataInfo.class */
    public static class TopologyMetricDataInfo implements Serializable {
        private static final long serialVersionUID = 1303262512351757610L;
        public String topologyId;
        public String type;
        public long timestamp;

        public Map<String, Object> toMap() {
            HashMap hashMap = new HashMap();
            hashMap.put(MetricUploader.METRIC_TIME, Long.valueOf(this.timestamp));
            hashMap.put(MetricUploader.METRIC_TYPE, this.type);
            return hashMap;
        }

        public String toString() {
            return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
        }
    }

    /* loaded from: input_file:com/alibaba/jstorm/daemon/nimbus/TopologyMetricsRunnable$Update.class */
    public static class Update extends Event {
        public TopologyMetric topologyMetrics;
    }

    public TopologyMetricsRunnable(NimbusData nimbusData) {
        setName(getClass().getSimpleName());
        this.nimbusData = nimbusData;
        this.localMode = nimbusData.isLocalMode();
        if (this.localMode) {
            this.metricStat = new AtomicIntegerArray(1);
            return;
        }
        LOG.info("create topology metrics runnable.");
        this.metricCache = nimbusData.getMetricCache();
        this.stormClusterState = nimbusData.getStormClusterState();
        this.isShutdown = nimbusData.getIsShutdown();
        this.clusterName = ConfigExtension.getClusterName(nimbusData.getConf());
        if (this.clusterName == null) {
            throw new RuntimeException("cluster.name property must be set in storm.yaml!");
        }
        this.maxPendingUploadMetrics = ConfigExtension.getMaxPendingMetricNum(nimbusData.getConf());
        this.metricStat = new AtomicIntegerArray(this.maxPendingUploadMetrics);
        int i = 0;
        for (int i2 = 0; i2 < this.maxPendingUploadMetrics; i2++) {
            if (getMetricDataInfoFromCache(i2) != null) {
                this.metricStat.set(i2, 1);
                i++;
            }
        }
        LOG.info("pending upload metrics: {}", Integer.valueOf(i));
        refreshTopologies();
        this.refreshTopologiesThread = new AsyncLoopThread(new RefreshTopologiesThread());
        this.clusterMetricsUpdateExecutor = Executors.newSingleThreadScheduledExecutor();
        this.clusterMetricsUpdateExecutor.scheduleAtFixedRate(new Runnable() { // from class: com.alibaba.jstorm.daemon.nimbus.TopologyMetricsRunnable.1
            @Override // java.lang.Runnable
            public void run() {
                int secOffset = TimeUtils.secOffset();
                if (secOffset < 55) {
                    JStormUtils.sleepMs((55 - secOffset) * ConfigExtension.DEFAULT_ZMQ_MAX_QUEUE_MSG);
                } else if (secOffset != 55) {
                    JStormUtils.sleepMs(((60 - secOffset) + 55) * ConfigExtension.DEFAULT_ZMQ_MAX_QUEUE_MSG);
                }
                TopologyMetricsRunnable.LOG.info("cluster metrics force upload.");
                TopologyMetricsRunnable.this.mergeAndUploadClusterMetrics();
            }
        }, 5L, 60L, TimeUnit.SECONDS);
        JStormMetrics.registerWorkerGauge(JStormMetrics.NIMBUS_METRIC_KEY, MetricDef.MEMORY_USED, new AsmGauge(new Gauge<Double>() { // from class: com.alibaba.jstorm.daemon.nimbus.TopologyMetricsRunnable.2
            /* renamed from: getValue, reason: merged with bridge method [inline-methods] */
            public Double m1109getValue() {
                return Double.valueOf(JStormUtils.getJVMHeapMemory());
            }
        }));
    }

    public void init() {
        if (this.localMode) {
            return;
        }
        String metricUploaderClass = ConfigExtension.getMetricUploaderClass(this.nimbusData.getConf());
        if (StringUtils.isBlank(metricUploaderClass)) {
            metricUploaderClass = DefaultMetricUploader.class.getName();
        }
        LOG.info("metric uploader class:{}", metricUploaderClass);
        Object newInstance = Utils.newInstance(metricUploaderClass);
        if (!(newInstance instanceof MetricUploader)) {
            throw new RuntimeException(metricUploaderClass + " isn't MetricUploader class ");
        }
        this.metricUploader = (MetricUploader) newInstance;
        try {
            this.metricUploader.init(this.nimbusData);
            LOG.info("Successfully init {}", metricUploaderClass);
            String metricQueryClientClass = ConfigExtension.getMetricQueryClientClass(this.nimbusData.getConf());
            if (StringUtils.isBlank(metricQueryClientClass)) {
                LOG.warn("use default metric query client class.");
                this.metricQueryClient = new DefaultMetricQueryClient();
            } else {
                LOG.info("metric query client class:{}", metricQueryClientClass);
                this.metricQueryClient = (MetricQueryClient) Utils.newInstance(metricQueryClientClass);
            }
            try {
                this.metricQueryClient.init(this.nimbusData.getConf());
                this.uploadControlThread.start();
                this.flushMetricMetaThread.start();
                LOG.info("init topology metric runnable done.");
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        } catch (Exception e2) {
            throw new RuntimeException(e2);
        }
    }

    public void shutdown() {
        LOG.info("Begin to shutdown");
        this.metricUploader.cleanup();
        LOG.info("Successfully shutdown");
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        while (this.isShutdown != null && !this.isShutdown.get() && !this.localMode) {
            try {
            } catch (Exception e) {
                if (!this.isShutdown.get()) {
                    LOG.error(e.getMessage(), e);
                }
            }
            if (this.metricUploader != null) {
                Event poll = this.queue.poll(1L, TimeUnit.MILLISECONDS);
                if (poll != null) {
                    if (poll instanceof Remove) {
                        handleRemoveEvent((Remove) poll);
                    } else if (poll instanceof Update) {
                        handleUpdateEvent((Update) poll);
                    } else if (poll instanceof Refresh) {
                        handleRefreshEvent((Refresh) poll);
                    } else if (poll instanceof KillTopologyEvent) {
                        handleKillTopologyEvent((KillTopologyEvent) poll);
                    } else if (poll instanceof StartTopologyEvent) {
                        handleStartTopologyEvent((StartTopologyEvent) poll);
                    } else if (poll instanceof TaskDeadEvent) {
                        handleTaskDeadEvent((TaskDeadEvent) poll);
                    } else if (poll instanceof TaskStartEvent) {
                        handleTaskStartEvent((TaskStartEvent) poll);
                    } else {
                        LOG.error("Unknown event type:{}", poll.getClass());
                    }
                }
            }
        }
    }

    public boolean isTopologyAlive(String str) {
        return this.topologyMetricContexts.containsKey(str);
    }

    private int getAndPresetFirstEmptyIndex() {
        for (int i = 0; i < this.maxPendingUploadMetrics; i++) {
            if (this.metricStat.get(i) == 0 && this.metricStat.compareAndSet(i, 0, 3)) {
                return i;
            }
        }
        return -1;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public int getFirstPendingUploadIndex() {
        for (int i = 0; i < this.maxPendingUploadMetrics; i++) {
            if (this.metricStat.get(i) == 1) {
                return i;
            }
        }
        return -1;
    }

    public void markUploaded(int i) {
        this.metricCache.remove(PENDING_UPLOAD_METRIC_DATA + i);
        this.metricCache.remove(PENDING_UPLOAD_METRIC_DATA_INFO + i);
        this.metricStat.set(i, 0);
    }

    public void markUploading(int i) {
        this.metricStat.set(i, 2);
    }

    public void markSet(int i) {
        this.metricStat.set(i, 1);
    }

    public TopologyMetric getMetricDataFromCache(int i) {
        return (TopologyMetric) this.metricCache.get(PENDING_UPLOAD_METRIC_DATA + i);
    }

    public TopologyMetricDataInfo getMetricDataInfoFromCache(int i) {
        return (TopologyMetricDataInfo) this.metricCache.get(PENDING_UPLOAD_METRIC_DATA_INFO + i);
    }

    public void pushEvent(Event event) {
        this.queue.offer(event);
    }

    public Map<String, Long> registerMetrics(String str, Set<String> set) {
        TimeTicker timeTicker = new TimeTicker(TimeUnit.MILLISECONDS, true);
        ConcurrentMap<String, Long> memMeta = this.topologyMetricContexts.get(str).getMemMeta();
        HashMap hashMap = new HashMap();
        for (String str2 : set) {
            Long l = memMeta.get(str2);
            if (l == null || !MetricUtils.isValidId(l.longValue())) {
                Long valueOf = Long.valueOf(this.metricIDGenerator.genMetricId(str2));
                Long putIfAbsent = memMeta.putIfAbsent(str2, valueOf);
                if (putIfAbsent == null) {
                    hashMap.put(str2, valueOf);
                } else {
                    hashMap.put(str2, putIfAbsent);
                }
            } else {
                hashMap.put(str2, l);
            }
        }
        LOG.info("register metrics, topology:{}, size:{}, cost:{}", new Object[]{str, Integer.valueOf(set.size()), Long.valueOf(timeTicker.stop())});
        return hashMap;
    }

    public void handleRemoveEvent(Remove remove) {
        String str = remove.topologyId;
        if (str != null) {
            removeTopology(str);
        }
        LOG.info("remove topology:{}.", str);
    }

    private void removeTopology(String str) {
        this.metricCache.removeTopology(str);
        this.metricCache.removeSampleRate(str);
        this.topologyMetricContexts.remove(str);
    }

    public void refreshTopologies() {
        for (String str : JStormMetrics.SYS_TOPOLOGIES) {
            if (!this.topologyMetricContexts.containsKey(str)) {
                LOG.info("adding {} to metric context.", str);
                HashMap hashMap = new HashMap();
                if (str.equals(JStormMetrics.CLUSTER_METRIC_KEY)) {
                    hashMap.put(ConfigExtension.TOPOLOGY_METRIC_SAMPLE_RATE, Double.valueOf(1.0d));
                }
                this.topologyMetricContexts.putIfAbsent(str, new TopologyMetricContext(str, Sets.newHashSet(new ResourceWorkerSlot[]{new ResourceWorkerSlot()}), hashMap));
                syncMetaFromCache(str, this.topologyMetricContexts.get(str));
            }
        }
        try {
            Map<String, Assignment> map = Cluster.get_all_assignment(this.stormClusterState, null);
            for (String str2 : map.keySet()) {
                if (!this.topologyMetricContexts.containsKey(str2)) {
                    Assignment assignment = map.get(str2);
                    TopologyMetricContext topologyMetricContext = new TopologyMetricContext(assignment.getWorkers());
                    topologyMetricContext.setTaskNum(NimbusUtils.getTopologyTaskNum(assignment));
                    syncMetaFromCache(str2, topologyMetricContext);
                    LOG.info("adding {} to metric context.", str2);
                    this.topologyMetricContexts.put(str2, topologyMetricContext);
                }
            }
            ArrayList<String> arrayList = new ArrayList();
            for (String str3 : this.topologyMetricContexts.keySet()) {
                if (!JStormMetrics.SYS_TOPOLOGY_SET.contains(str3) && !map.containsKey(str3)) {
                    arrayList.add(str3);
                }
            }
            for (String str4 : arrayList) {
                LOG.info("removing topology:{}", str4);
                removeTopology(str4);
            }
        } catch (Exception e) {
            LOG.warn("failed to get assignments");
        }
    }

    public void syncTopologyMeta() {
        for (String str : JStormMetrics.SYS_TOPOLOGIES) {
            if (this.topologyMetricContexts.containsKey(str)) {
                syncMetaFromRemote(str, this.topologyMetricContexts.get(str));
            }
        }
        try {
            Map<String, Assignment> map = Cluster.get_all_assignment(this.stormClusterState, null);
            for (String str2 : map.keySet()) {
                if (this.topologyMetricContexts.containsKey(str2)) {
                    Assignment assignment = map.get(str2);
                    TopologyMetricContext topologyMetricContext = new TopologyMetricContext(assignment.getWorkers());
                    topologyMetricContext.setTaskNum(NimbusUtils.getTopologyTaskNum(assignment));
                    syncMetaFromCache(str2, topologyMetricContext);
                    syncMetaFromRemote(str2, topologyMetricContext);
                }
            }
        } catch (Exception e) {
            LOG.warn("failed to get assignments");
        }
    }

    private void syncMetaFromCache(String str, TopologyMetricContext topologyMetricContext) {
        if (topologyMetricContext.syncMeta()) {
            return;
        }
        Map<String, Long> meta = this.metricCache.getMeta(str);
        if (meta != null) {
            topologyMetricContext.getMemMeta().putAll(meta);
        }
        topologyMetricContext.setSyncMeta(true);
    }

    private void syncMetaFromRemote(String str, TopologyMetricContext topologyMetricContext) {
        try {
            int size = topologyMetricContext.getMemMeta().size();
            Integer num = (Integer) this.stormClusterState.get_topology_metric(str);
            if (num != null && size != num.intValue()) {
                ConcurrentMap<String, Long> memMeta = topologyMetricContext.getMemMeta();
                for (MetaType metaType : MetaType.values()) {
                    List<MetricMeta> metricMeta = this.metricQueryClient.getMetricMeta(this.clusterName, str, metaType);
                    if (metricMeta != null) {
                        LOG.info("get remote metric meta, topology:{}, metaType:{}, mem:{}, zk:{}, new size:{}", new Object[]{str, metaType, Integer.valueOf(size), num, Integer.valueOf(metricMeta.size())});
                        for (MetricMeta metricMeta2 : metricMeta) {
                            memMeta.putIfAbsent(metricMeta2.getFQN(), Long.valueOf(metricMeta2.getId()));
                        }
                    }
                }
                this.metricCache.putMeta(str, memMeta);
            }
        } catch (Exception e) {
            LOG.error("failed to sync remote meta", e);
        }
    }

    protected void handleKillTopologyEvent(KillTopologyEvent killTopologyEvent) {
        this.metricUploader.sendEvent(this.clusterName, killTopologyEvent);
        removeTopology(killTopologyEvent.topologyId);
    }

    private void handleStartTopologyEvent(StartTopologyEvent startTopologyEvent) {
        this.metricCache.putSampleRate(startTopologyEvent.topologyId, startTopologyEvent.sampleRate);
        this.metricUploader.sendEvent(this.clusterName, startTopologyEvent);
        if (this.topologyMetricContexts.containsKey(startTopologyEvent.topologyId)) {
            return;
        }
        this.topologyMetricContexts.put(startTopologyEvent.topologyId, new TopologyMetricContext());
    }

    private void handleTaskDeadEvent(TaskDeadEvent taskDeadEvent) {
        this.metricUploader.sendEvent(this.clusterName, taskDeadEvent);
        HashSet<ResourceWorkerSlot> hashSet = new HashSet();
        hashSet.addAll(taskDeadEvent.deadTasks.values());
        for (ResourceWorkerSlot resourceWorkerSlot : hashSet) {
            this.metricCache.unregisterWorker(taskDeadEvent.topologyId, resourceWorkerSlot.getHostname(), resourceWorkerSlot.getPort());
        }
    }

    private void handleTaskStartEvent(TaskStartEvent taskStartEvent) {
        Assignment assignment = taskStartEvent.newAssignment;
        TopologyMetricContext topologyMetricContext = this.topologyMetricContexts.get(taskStartEvent.topologyId);
        if (topologyMetricContext != null) {
            topologyMetricContext.setWorkerSet(assignment.getWorkers());
        } else {
            TopologyMetricContext topologyMetricContext2 = new TopologyMetricContext();
            topologyMetricContext2.setWorkerSet(assignment.getWorkers());
            this.topologyMetricContexts.put(taskStartEvent.topologyId, topologyMetricContext2);
        }
        this.metricUploader.sendEvent(this.clusterName, taskStartEvent);
    }

    public void handleRefreshEvent(Refresh refresh) {
        TimeTicker timeTicker = new TimeTicker(TimeUnit.MILLISECONDS, true);
        try {
            refreshTopologies();
            LOG.info("refresh topologies, cost:{}", Long.valueOf(timeTicker.stopAndRestart()));
            if (!this.nimbusData.isLeader()) {
                syncTopologyMeta();
                LOG.info("sync topology meta, cost:{}", Long.valueOf(timeTicker.stop()));
            }
        } catch (Exception e) {
            LOG.error("handleRefreshEvent error:", e);
        }
    }

    private TopologyMetricContext getClusterTopologyMetricContext() {
        return this.topologyMetricContexts.get(JStormMetrics.CLUSTER_METRIC_KEY);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void mergeAndUploadClusterMetrics() {
        TopologyMetricContext clusterTopologyMetricContext = getClusterTopologyMetricContext();
        TopologyMetric mergeMetrics = clusterTopologyMetricContext.mergeMetrics();
        if (mergeMetrics == null) {
            mergeMetrics = MetricUtils.mkTopologyMetric();
            mergeMetrics.set_topologyMetric(MetricUtils.mkMetricInfo());
        }
        MetricInfo metricInfo = mergeMetrics.get_topologyMetric();
        ConcurrentMap<String, Long> memMeta = clusterTopologyMetricContext.getMemMeta();
        for (Map.Entry<String, Map<Integer, MetricSnapshot>> entry : metricInfo.get_metrics().entrySet()) {
            String key = entry.getKey();
            MetricType metricType = MetricUtils.metricType(key);
            Long l = memMeta.get(key);
            Iterator<Map.Entry<Integer, MetricSnapshot>> it = entry.getValue().entrySet().iterator();
            while (it.hasNext()) {
                MetricSnapshot value = it.next().getValue();
                value.set_metricId(l.longValue());
                if (metricType == MetricType.HISTOGRAM) {
                    value.set_points(new byte[0]);
                }
            }
        }
        long currentTimeMillis = System.currentTimeMillis();
        for (Map.Entry<String, Long> entry2 : memMeta.entrySet()) {
            String key2 = entry2.getKey();
            if (!metricInfo.get_metrics().containsKey(key2)) {
                Map<Integer, MetricSnapshot> hashMap = new HashMap<>();
                hashMap.put(AsmWindow.M1_WINDOW, new MetricSnapshot(entry2.getValue().longValue(), currentTimeMillis, MetricUtils.metricType(key2).getT()));
                metricInfo.put_to_metrics(key2, hashMap);
            }
        }
        Update update = new Update();
        update.timestamp = System.currentTimeMillis();
        update.topologyMetrics = mergeMetrics;
        update.topologyId = JStormMetrics.CLUSTER_METRIC_KEY;
        pushEvent(update);
        LOG.info("send update event for cluster metrics, size : {}", Integer.valueOf(metricInfo.get_metrics_size()));
    }

    private void updateClusterMetrics(String str, TopologyMetric topologyMetric) {
        if (topologyMetric.get_topologyMetric().get_metrics_size() > 0) {
            TopologyMetricContext clusterTopologyMetricContext = getClusterTopologyMetricContext();
            MetricInfo metricInfo = topologyMetric.get_topologyMetric();
            MetricInfo mkMetricInfo = MetricUtils.mkMetricInfo();
            HashSet hashSet = new HashSet();
            for (Map.Entry<String, Map<Integer, MetricSnapshot>> entry : metricInfo.get_metrics().entrySet()) {
                String str2 = MetricUtils.topo2clusterName(entry.getKey());
                MetricType metricType = MetricUtils.metricType(str2);
                HashMap hashMap = new HashMap();
                for (Map.Entry<Integer, MetricSnapshot> entry2 : entry.getValue().entrySet()) {
                    hashMap.put(entry2.getKey(), entry2.getValue().m220deepCopy());
                    if (metricType == MetricType.HISTOGRAM) {
                        entry2.getValue().set_points(new byte[0]);
                    }
                }
                mkMetricInfo.put_to_metrics(str2, hashMap);
                hashSet.add(str2);
            }
            clusterTopologyMetricContext.addToMemCache(str, mkMetricInfo);
            registerMetrics(JStormMetrics.CLUSTER_METRIC_KEY, hashSet);
        }
    }

    public void handleUpdateEvent(Update update) {
        TopologyMetric topologyMetric = update.topologyMetrics;
        String str = update.topologyId;
        if (!this.topologyMetricContexts.containsKey(str)) {
            LOG.warn("topology {} has been killed or has not started, skip update.", str);
            return;
        }
        if (!JStormMetrics.CLUSTER_METRIC_KEY.equals(str)) {
            updateClusterMetrics(str, topologyMetric);
        }
        this.metricCache.putMetricData(str, topologyMetric);
        int andPresetFirstEmptyIndex = getAndPresetFirstEmptyIndex();
        if (andPresetFirstEmptyIndex < 0) {
            LOG.error("exceeding maxPendingUploadMetrics, skip caching metrics data for topology:{}", str);
            return;
        }
        TopologyMetricDataInfo topologyMetricDataInfo = new TopologyMetricDataInfo();
        int i = 0;
        topologyMetricDataInfo.topologyId = str;
        topologyMetricDataInfo.timestamp = update.timestamp;
        if (str.equals(JStormMetrics.NIMBUS_METRIC_KEY) || str.equals(JStormMetrics.CLUSTER_METRIC_KEY)) {
            topologyMetricDataInfo.type = MetricUploader.METRIC_TYPE_TOPLOGY;
        } else {
            i = 0 + topologyMetric.get_topologyMetric().get_metrics_size() + topologyMetric.get_componentMetric().get_metrics_size();
            if (i > 0) {
                int i2 = topologyMetric.get_taskMetric().get_metrics_size() + topologyMetric.get_workerMetric().get_metrics_size() + topologyMetric.get_nettyMetric().get_metrics_size() + topologyMetric.get_streamMetric().get_metrics_size();
                if (i2 > 0) {
                    i += i2;
                    topologyMetricDataInfo.type = MetricUploader.METRIC_TYPE_ALL;
                } else {
                    topologyMetricDataInfo.type = MetricUploader.METRIC_TYPE_TOPLOGY;
                }
            } else {
                topologyMetricDataInfo.type = MetricUploader.METRIC_TYPE_TASK;
                i += topologyMetric.get_taskMetric().get_metrics_size();
            }
        }
        this.metricCache.put(PENDING_UPLOAD_METRIC_DATA_INFO + andPresetFirstEmptyIndex, topologyMetricDataInfo);
        this.metricCache.put(PENDING_UPLOAD_METRIC_DATA + andPresetFirstEmptyIndex, topologyMetric);
        markSet(andPresetFirstEmptyIndex);
        LOG.info("put metric data to local cache, topology:{}, idx:{}, total:{}", new Object[]{str, Integer.valueOf(andPresetFirstEmptyIndex), Integer.valueOf(i)});
    }

    public TopologyMetric getTopologyMetric(String str) {
        long nanoTime = System.nanoTime();
        try {
            TopologyMetric topologyMetric = new TopologyMetric();
            List<MetricInfo> metricData = this.metricCache.getMetricData(str, MetaType.TOPOLOGY);
            List<MetricInfo> metricData2 = this.metricCache.getMetricData(str, MetaType.COMPONENT);
            List<MetricInfo> metricData3 = this.metricCache.getMetricData(str, MetaType.WORKER);
            MetricInfo mkMetricInfo = MetricUtils.mkMetricInfo();
            if (metricData.size() > 0) {
                topologyMetric.set_topologyMetric(metricData.get(metricData.size() - 1));
            } else {
                topologyMetric.set_topologyMetric(mkMetricInfo);
            }
            if (metricData2.size() > 0) {
                topologyMetric.set_componentMetric(metricData2.get(0));
            } else {
                topologyMetric.set_componentMetric(mkMetricInfo);
            }
            if (metricData3.size() > 0) {
                topologyMetric.set_workerMetric(metricData3.get(0));
            } else {
                topologyMetric.set_workerMetric(mkMetricInfo);
            }
            topologyMetric.set_taskMetric(mkMetricInfo);
            topologyMetric.set_streamMetric(mkMetricInfo);
            topologyMetric.set_nettyMetric(mkMetricInfo);
            SimpleJStormMetric.updateNimbusHistogram("getTopologyMetric", Long.valueOf((System.nanoTime() - nanoTime) / 1000));
            return topologyMetric;
        } catch (Throwable th) {
            SimpleJStormMetric.updateNimbusHistogram("getTopologyMetric", Long.valueOf((System.nanoTime() - nanoTime) / 1000));
            throw th;
        }
    }

    public static String getWorkerSlotName(String str, Integer num) {
        return str + ":" + num;
    }
}
