package com.alibaba.jstorm.task.master.metrics;

import backtype.storm.generated.MetricInfo;
import backtype.storm.generated.MetricSnapshot;
import backtype.storm.generated.TopologyMetric;
import backtype.storm.task.TopologyContext;
import backtype.storm.utils.NimbusClientWrapper;
import com.alibaba.jstorm.cluster.StormClusterState;
import com.alibaba.jstorm.metric.MetaType;
import com.alibaba.jstorm.metric.MetricUtils;
import com.alibaba.jstorm.metric.TopologyMetricContext;
import com.alibaba.jstorm.task.error.ErrorConstants;
import com.alibaba.jstorm.task.master.TMHandler;
import com.alibaba.jstorm.task.master.TopologyMasterContext;
import com.alibaba.jstorm.utils.JStormUtils;
import com.alibaba.jstorm.utils.TimeUtils;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import shade.storm.com.google.common.collect.Maps;

/* loaded from: input_file:com/alibaba/jstorm/task/master/metrics/MetricsUploader.class */
public class MetricsUploader implements TMHandler {
    private static final Logger LOG = LoggerFactory.getLogger(MetricsUploader.class);
    private static final Logger metricLogger = TopologyMetricContext.LOG;
    public static final int MAX_BATCH_SIZE = 10000;
    private static final int UPLOAD_TIME_OFFSET_SEC = 35;
    private TopologyMetricContext topologyMetricContext;
    private Map conf;
    private String topologyId;
    private NimbusClientWrapper client;
    private StormClusterState zkCluster;
    private TopologyContext context;
    private final MetricInfo dummy = MetricUtils.mkMetricInfo();
    private final Object lock = new Object();
    private final AtomicBoolean uploading = new AtomicBoolean(false);

    @Override // com.alibaba.jstorm.task.master.TMHandler
    public void init(TopologyMasterContext topologyMasterContext) {
        this.context = topologyMasterContext.getContext();
        this.topologyMetricContext = topologyMasterContext.getTopologyMetricContext();
        this.conf = topologyMasterContext.getConf();
        this.topologyId = topologyMasterContext.getTopologyId();
        this.zkCluster = topologyMasterContext.getZkCluster();
    }

    @Override // com.alibaba.jstorm.task.master.TMHandler
    public void process(Object obj) throws Exception {
        int secOffset = TimeUtils.secOffset();
        if (secOffset < UPLOAD_TIME_OFFSET_SEC) {
            JStormUtils.sleepMs((UPLOAD_TIME_OFFSET_SEC - secOffset) * 1000);
        } else if (secOffset != UPLOAD_TIME_OFFSET_SEC) {
            JStormUtils.sleepMs(((60 - secOffset) + UPLOAD_TIME_OFFSET_SEC) * 1000);
        }
        if (this.topologyMetricContext.getUploadedWorkerNum() > 0) {
            metricLogger.info("force upload metrics.");
            mergeAndUploadMetrics();
        }
    }

    @Override // com.alibaba.jstorm.task.master.TMHandler
    public void cleanup() {
    }

    private void mergeAndUploadMetrics() throws Exception {
        TopologyMetric mergeMetrics;
        if (!this.uploading.compareAndSet(false, true)) {
            LOG.warn("another thread is already uploading, skip...");
            return;
        }
        if (this.topologyMetricContext.getUploadedWorkerNum() > 0 && (mergeMetrics = this.topologyMetricContext.mergeMetrics()) != null) {
            uploadMetrics(mergeMetrics);
        }
        this.uploading.set(false);
    }

    private void uploadMetrics(TopologyMetric topologyMetric) throws Exception {
        long currentTimeMillis = System.currentTimeMillis();
        if (topologyMetric == null) {
            return;
        }
        try {
            synchronized (this.lock) {
                if (this.client == null || !this.client.isValid()) {
                    this.client = new NimbusClientWrapper();
                    this.client.init(this.conf);
                }
            }
            MetricInfo metricInfo = topologyMetric.get_topologyMetric();
            MetricInfo metricInfo2 = topologyMetric.get_componentMetric();
            MetricInfo metricInfo3 = topologyMetric.get_taskMetric();
            MetricInfo metricInfo4 = topologyMetric.get_streamMetric();
            MetricInfo metricInfo5 = topologyMetric.get_workerMetric();
            MetricInfo metricInfo6 = topologyMetric.get_nettyMetric();
            if (metricInfo.get_metrics_size() + metricInfo2.get_metrics_size() + metricInfo3.get_metrics_size() + metricInfo4.get_metrics_size() + metricInfo5.get_metrics_size() + metricInfo6.get_metrics_size() < 10000) {
                this.client.getClient().uploadTopologyMetrics(this.topologyId, new TopologyMetric(metricInfo, metricInfo2, metricInfo5, metricInfo3, metricInfo4, metricInfo6));
            } else {
                this.client.getClient().uploadTopologyMetrics(this.topologyId, new TopologyMetric(metricInfo, metricInfo2, this.dummy, this.dummy, this.dummy, this.dummy));
                batchUploadMetrics(metricInfo5, MetaType.WORKER);
                batchUploadMetrics(metricInfo3, MetaType.TASK);
                batchUploadMetrics(metricInfo4, MetaType.STREAM);
                batchUploadMetrics(metricInfo6, MetaType.NETTY);
            }
        } catch (Exception e) {
            LOG.error("Failed to upload worker metrics ", e);
            if (this.client != null) {
                this.client.cleanup();
            }
            this.zkCluster.report_task_error(this.context.getTopologyId(), this.context.getThisTaskId(), "Failed to upload worker metrics", ErrorConstants.WARN, ErrorConstants.CODE_USER);
        }
        metricLogger.info("upload metrics, cost:{}", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
    }

    private void batchUploadMetrics(MetricInfo metricInfo, MetaType metaType) throws Exception {
        if (metricInfo.get_metrics_size() <= 10000) {
            uploadParts(metricInfo, metaType);
            return;
        }
        Map<String, Map<Integer, MetricSnapshot>> map = metricInfo.get_metrics();
        HashMap newHashMapWithExpectedSize = Maps.newHashMapWithExpectedSize(MAX_BATCH_SIZE);
        MetricInfo metricInfo2 = new MetricInfo();
        int i = 0;
        for (Map.Entry<String, Map<Integer, MetricSnapshot>> entry : map.entrySet()) {
            newHashMapWithExpectedSize.put(entry.getKey(), entry.getValue());
            i++;
            if (i >= 10000) {
                metricInfo2.set_metrics(newHashMapWithExpectedSize);
                uploadParts(metricInfo2, metaType);
                i = 0;
                newHashMapWithExpectedSize.clear();
            }
        }
        if (newHashMapWithExpectedSize.size() > 0) {
            metricInfo2.set_metrics(newHashMapWithExpectedSize);
            uploadParts(metricInfo2, metaType);
        }
    }

    private void uploadParts(MetricInfo metricInfo, MetaType metaType) throws Exception {
        if (metaType == MetaType.TASK) {
            this.client.getClient().uploadTopologyMetrics(this.topologyId, new TopologyMetric(this.dummy, this.dummy, this.dummy, metricInfo, this.dummy, this.dummy));
            return;
        }
        if (metaType == MetaType.STREAM) {
            this.client.getClient().uploadTopologyMetrics(this.topologyId, new TopologyMetric(this.dummy, this.dummy, this.dummy, this.dummy, metricInfo, this.dummy));
        } else if (metaType == MetaType.WORKER) {
            this.client.getClient().uploadTopologyMetrics(this.topologyId, new TopologyMetric(this.dummy, this.dummy, metricInfo, this.dummy, this.dummy, this.dummy));
        } else if (metaType == MetaType.NETTY) {
            this.client.getClient().uploadTopologyMetrics(this.topologyId, new TopologyMetric(this.dummy, this.dummy, this.dummy, this.dummy, this.dummy, metricInfo));
        }
    }
}
