package com.alibaba.jstorm.daemon.nimbus.metric.refresh;

import com.alibaba.jstorm.client.ConfigExtension;
import com.alibaba.jstorm.cluster.Cluster;
import com.alibaba.jstorm.common.metric.MetricMeta;
import com.alibaba.jstorm.daemon.nimbus.NimbusUtils;
import com.alibaba.jstorm.daemon.nimbus.metric.CheckMetricEvent;
import com.alibaba.jstorm.daemon.nimbus.metric.MetricEvent;
import com.alibaba.jstorm.daemon.nimbus.metric.assignment.RemoveTopologyEvent;
import com.alibaba.jstorm.metric.JStormMetrics;
import com.alibaba.jstorm.metric.MetaType;
import com.alibaba.jstorm.metric.TimeTicker;
import com.alibaba.jstorm.metric.TopologyMetricContext;
import com.alibaba.jstorm.schedule.Assignment;
import com.alibaba.jstorm.schedule.default_assign.ResourceWorkerSlot;
import com.alibaba.jstorm.utils.Pair;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import shade.storm.com.google.common.collect.Sets;

/* loaded from: input_file:com/alibaba/jstorm/daemon/nimbus/metric/refresh/RefreshEvent.class */
public class RefreshEvent extends MetricEvent {
    private static final Logger LOG = LoggerFactory.getLogger(RefreshEvent.class);
    private static final int SYNC_REMOTE_META_TIME_SEC = 600;

    @Override // com.alibaba.jstorm.daemon.nimbus.metric.MetricEvent, java.lang.Runnable
    public void run() {
        refreshTopologies();
    }

    public void refreshTopologies() {
        TimeTicker timeTicker = new TimeTicker(TimeUnit.MILLISECONDS, true);
        try {
            doRefreshTopologies();
            LOG.debug("Refresh topologies, cost:{}", Long.valueOf(timeTicker.stopAndRestart()));
            if (!this.context.getNimbusData().isLeader()) {
                syncTopologyMeta();
                LOG.debug("Sync topology meta, cost:{}", Long.valueOf(timeTicker.stop()));
            } else if (this.context.getNimbusData().uptime() < 600) {
                syncSysMetaFromRemote();
            }
        } catch (Exception e) {
            LOG.error("handleRefreshEvent error:", e);
        }
    }

    private void doRefreshTopologies() {
        for (String str : JStormMetrics.SYS_TOPOLOGIES) {
            if (!this.context.getTopologyMetricContexts().containsKey(str)) {
                LOG.info("adding {} to metric context.", str);
                HashMap hashMap = new HashMap();
                if (str.equals(JStormMetrics.CLUSTER_METRIC_KEY)) {
                    hashMap.put(ConfigExtension.TOPOLOGY_METRIC_SAMPLE_RATE, Double.valueOf(1.0d));
                }
                this.context.getTopologyMetricContexts().putIfAbsent(str, new TopologyMetricContext(str, Sets.newHashSet(new ResourceWorkerSlot()), hashMap));
                syncMetaFromCache(str, this.context.getTopologyMetricContexts().get(str));
                syncMetaFromRemote(str, this.context.getTopologyMetricContexts().get(str));
            }
        }
        try {
            Map<String, Assignment> map = Cluster.get_all_assignment(this.context.getStormClusterState(), null);
            for (Map.Entry<String, Assignment> entry : map.entrySet()) {
                String key = entry.getKey();
                Assignment value = entry.getValue();
                TopologyMetricContext topologyMetricContext = this.context.getTopologyMetricContexts().get(key);
                if (topologyMetricContext == null) {
                    TopologyMetricContext topologyMetricContext2 = new TopologyMetricContext(value.getWorkers());
                    topologyMetricContext2.setTaskNum(NimbusUtils.getTopologyTaskNum(value));
                    syncMetaFromCache(key, topologyMetricContext2);
                    LOG.info("adding {} to metric context.", key);
                    this.context.getTopologyMetricContexts().put(key, topologyMetricContext2);
                } else {
                    boolean z = false;
                    if (topologyMetricContext.getTaskNum() != NimbusUtils.getTopologyTaskNum(value)) {
                        z = true;
                        topologyMetricContext.setTaskNum(NimbusUtils.getTopologyTaskNum(value));
                    }
                    if (!value.getWorkers().equals(topologyMetricContext.getWorkerSet())) {
                        z = true;
                        topologyMetricContext.setWorkerSet(value.getWorkers());
                    }
                    topologyMetricContext.setSyncMeta(!z);
                }
            }
            ArrayList<String> arrayList = new ArrayList();
            for (String str2 : this.context.getTopologyMetricContexts().keySet()) {
                if (!JStormMetrics.SYS_TOPOLOGY_SET.contains(str2) && !map.containsKey(str2)) {
                    arrayList.add(str2);
                }
            }
            for (String str3 : arrayList) {
                LOG.info("removing topology:{}", str3);
                RemoveTopologyEvent.pushEvent(str3);
            }
        } catch (Exception e) {
            LOG.warn("Failed to get assignments");
        }
    }

    public void syncTopologyMeta() {
        syncSysMetaFromRemote();
        for (Map.Entry<String, TopologyMetricContext> entry : this.context.getTopologyMetricContexts().entrySet()) {
            String key = entry.getKey();
            TopologyMetricContext value = entry.getValue();
            if (!JStormMetrics.SYS_TOPOLOGY_SET.contains(key)) {
                try {
                    syncMetaFromCache(key, value);
                    syncMetaFromRemote(key, value);
                } catch (Exception e) {
                    LOG.warn("failed to sync meta for topology:{}", key);
                }
            }
        }
    }

    private void syncMetaFromCache(String str, TopologyMetricContext topologyMetricContext) {
        if (topologyMetricContext.syncMeta()) {
            return;
        }
        Map<String, Long> meta = this.context.getMetricCache().getMeta(str);
        if (meta != null) {
            topologyMetricContext.getMemMeta().putAll(meta);
        }
        topologyMetricContext.setSyncMeta(true);
    }

    private void syncSysMetaFromRemote() {
        for (String str : JStormMetrics.SYS_TOPOLOGIES) {
            if (this.context.getTopologyMetricContexts().containsKey(str)) {
                syncMetaFromRemote(str, this.context.getTopologyMetricContexts().get(str));
            }
        }
    }

    private void syncMetaFromRemote(String str, TopologyMetricContext topologyMetricContext) {
        try {
            int size = topologyMetricContext.getMemMeta().size();
            HashSet hashSet = new HashSet();
            ArrayList arrayList = new ArrayList();
            ConcurrentMap<String, Long> memMeta = topologyMetricContext.getMemMeta();
            for (MetaType metaType : MetaType.values()) {
                List<MetricMeta> metricMeta = this.context.getMetricQueryClient().getMetricMeta(this.context.getClusterName(), str, metaType);
                if (metricMeta != null) {
                    LOG.debug("get remote metric meta, topology:{}, metaType:{}, local mem:{}, remote:{}", new Object[]{str, metaType, Integer.valueOf(size), Integer.valueOf(metricMeta.size())});
                    for (MetricMeta metricMeta2 : metricMeta) {
                        String fqn = metricMeta2.getFQN();
                        if (hashSet.contains(fqn)) {
                            Long l = memMeta.get(fqn);
                            if (l != null && l.longValue() != metricMeta2.getId()) {
                                LOG.warn("duplicate remote metric meta:{}, will double-check...", fqn);
                                arrayList.add(new Pair(metricMeta2, l));
                            }
                        } else {
                            LOG.debug("overwrite local from remote:{}", fqn);
                            hashSet.add(fqn);
                            memMeta.put(fqn, Long.valueOf(metricMeta2.getId()));
                        }
                    }
                }
            }
            this.context.getMetricCache().putMeta(str, memMeta);
            if (arrayList.size() > 0) {
                CheckMetricEvent.pushEvent(str, topologyMetricContext, arrayList);
            }
        } catch (Exception e) {
            LOG.error("failed to sync remote meta", e);
        }
    }
}
