package com.alibaba.jstorm.task.master;

import backtype.storm.generated.MetricInfo;
import backtype.storm.generated.MetricSnapshot;
import backtype.storm.generated.Nimbus;
import backtype.storm.generated.TopologyMetric;
import backtype.storm.generated.WorkerUploadMetrics;
import backtype.storm.security.auth.ThriftClient;
import backtype.storm.task.IBolt;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IDynamicComponent;
import backtype.storm.tuple.Tuple;
import backtype.storm.utils.NimbusClient;
import com.alibaba.jstorm.client.ConfigExtension;
import com.alibaba.jstorm.cluster.Common;
import com.alibaba.jstorm.cluster.StormClusterState;
import com.alibaba.jstorm.cluster.StormConfig;
import com.alibaba.jstorm.metric.MetaType;
import com.alibaba.jstorm.metric.MetricUtils;
import com.alibaba.jstorm.metric.TopologyMetricContext;
import com.alibaba.jstorm.schedule.default_assign.ResourceWorkerSlot;
import com.alibaba.jstorm.task.backpressure.BackpressureCoordinator;
import com.alibaba.jstorm.task.heartbeat.TaskHeartbeatUpdater;
import com.alibaba.jstorm.utils.IntervalCheck;
import com.alibaba.jstorm.utils.JStormUtils;
import com.alibaba.jstorm.utils.TimeUtils;
import com.google.common.collect.Maps;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alibaba/jstorm/task/master/TopologyMaster.class */
public class TopologyMaster implements IBolt, IDynamicComponent {
    private static final long serialVersionUID = 4690656768333833626L;
    private static final Logger LOG = LoggerFactory.getLogger(TopologyMaster.class);
    public static final int MAX_BATCH_SIZE = 10000;
    public static final String FIELD_METRIC_WORKER = "worker";
    public static final String FIELD_METRIC_METRICS = "metrics";
    public static final String FILED_HEARBEAT_EVENT = "hbEvent";
    public static final String FILED_CTRL_EVENT = "ctrlEvent";
    private Map conf;
    private StormClusterState zkCluster;
    private OutputCollector collector;
    private int taskId;
    private String topologyId;
    private volatile Set<ResourceWorkerSlot> workerSet;
    private IntervalCheck intervalCheck;
    private TaskHeartbeatUpdater taskHeartbeatUpdater;
    private BackpressureCoordinator backpressureCoordinator;
    private TopologyMetricContext topologyMetricContext;
    private ScheduledExecutorService uploadMetricsExecutor;
    private Thread updateThread;
    private IntervalCheck threadAliveCheck;
    private final Logger metricLogger = LoggerFactory.getLogger(TopologyMetricContext.class);
    private final MetricInfo dummy = MetricUtils.mkMetricInfo();
    private BlockingQueue<Tuple> queue = new LinkedBlockingDeque();
    private volatile boolean isActive = true;

    /* loaded from: input_file:com/alibaba/jstorm/task/master/TopologyMaster$TopologyMasterRunnable.class */
    private class TopologyMasterRunnable implements Runnable {
        private TopologyMasterRunnable() {
        }

        @Override // java.lang.Runnable
        public void run() {
            while (TopologyMaster.this.isActive) {
                try {
                    Tuple tuple = (Tuple) TopologyMaster.this.queue.take();
                    if (tuple != null) {
                        TopologyMaster.this.eventHandle(tuple);
                    }
                } catch (Throwable th) {
                    TopologyMaster.LOG.error("Failed to process event", th);
                }
            }
        }
    }

    @Override // backtype.storm.task.IBolt
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.conf = topologyContext.getStormConf();
        this.collector = outputCollector;
        this.taskId = topologyContext.getThisTaskId();
        this.topologyId = topologyContext.getTopologyId();
        this.zkCluster = topologyContext.getZkCluster();
        try {
            this.workerSet = this.zkCluster.assignment_info(this.topologyId, null).getWorkers();
            this.intervalCheck = new IntervalCheck();
            this.intervalCheck.setInterval(10L);
            this.intervalCheck.start();
        } catch (Exception e) {
            LOG.error("Failed to get assignment for " + this.topologyId);
        }
        this.taskHeartbeatUpdater = new TaskHeartbeatUpdater(this.conf, this.topologyId, this.taskId, this.zkCluster);
        this.backpressureCoordinator = new BackpressureCoordinator(outputCollector, topologyContext, Integer.valueOf(this.taskId));
        this.topologyMetricContext = new TopologyMetricContext(this.topologyId, this.workerSet, this.conf);
        this.uploadMetricsExecutor = Executors.newSingleThreadScheduledExecutor();
        this.uploadMetricsExecutor.scheduleAtFixedRate(new Runnable() { // from class: com.alibaba.jstorm.task.master.TopologyMaster.1
            @Override // java.lang.Runnable
            public void run() {
                int secOffset = TimeUtils.secOffset();
                if (secOffset < 35) {
                    JStormUtils.sleepMs((35 - secOffset) * ConfigExtension.DEFAULT_ZMQ_MAX_QUEUE_MSG);
                } else if (secOffset != 35) {
                    JStormUtils.sleepMs(((60 - secOffset) + 35) * ConfigExtension.DEFAULT_ZMQ_MAX_QUEUE_MSG);
                }
                if (TopologyMaster.this.topologyMetricContext.getUploadedWorkerNum() > 0) {
                    TopologyMaster.this.metricLogger.info("force upload metrics.");
                    TopologyMaster.this.mergeAndUpload();
                }
            }
        }, 5L, 60L, TimeUnit.SECONDS);
        this.updateThread = new Thread(new TopologyMasterRunnable());
        this.updateThread.start();
        this.threadAliveCheck = new IntervalCheck();
        this.threadAliveCheck.setInterval(30L);
        this.threadAliveCheck.start();
    }

    @Override // backtype.storm.task.IBolt
    public void execute(Tuple tuple) {
        if (tuple == null) {
            LOG.error("Received null tuple!");
            return;
        }
        try {
            this.queue.put(tuple);
        } catch (InterruptedException e) {
            LOG.error("Failed to put event to taskHb updater's queue", e);
        }
        if (this.threadAliveCheck.check() && (this.updateThread == null || !this.updateThread.isAlive())) {
            this.updateThread = new Thread(new TopologyMasterRunnable());
            this.updateThread.start();
        }
        this.collector.ack(tuple);
    }

    @Override // backtype.storm.task.IBolt
    public void cleanup() {
        this.isActive = false;
        LOG.info("Successfully cleanup");
    }

    private void updateTopologyWorkerSet() {
        if (this.intervalCheck.check()) {
            try {
                this.workerSet = this.zkCluster.assignment_info(this.topologyId, null).getWorkers();
            } catch (Exception e) {
                LOG.error("Failed to get assignment for " + this.topologyId);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void eventHandle(Tuple tuple) {
        updateTopologyWorkerSet();
        String sourceStreamId = tuple.getSourceStreamId();
        try {
            if (sourceStreamId.equals(Common.TOPOLOGY_MASTER_HB_STREAM_ID)) {
                this.taskHeartbeatUpdater.process(tuple);
            } else if (sourceStreamId.equals(Common.TOPOLOGY_MASTER_METRICS_STREAM_ID)) {
                updateMetrics(tuple);
            } else if (sourceStreamId.equals(Common.TOPOLOGY_MASTER_CONTROL_STREAM_ID)) {
                this.backpressureCoordinator.process(tuple);
            }
        } catch (Exception e) {
            LOG.error("Failed to handle event: " + tuple.toString(), e);
        }
    }

    @Override // backtype.storm.topology.IDynamicComponent
    public void update(Map map) {
        LOG.info("Topology master received new conf:" + map);
        if (this.backpressureCoordinator.isBackpressureConfigChange(map)) {
            this.backpressureCoordinator.updateBackpressureConfig(map);
        }
    }

    private void updateMetrics(Tuple tuple) {
        String str = (String) tuple.getValueByField(FIELD_METRIC_WORKER);
        WorkerUploadMetrics workerUploadMetrics = (WorkerUploadMetrics) tuple.getValueByField("metrics");
        this.topologyMetricContext.addToMemCache(str, workerUploadMetrics.get_allMetrics());
        this.metricLogger.info("received metrics from:{}, size:{}", str, Integer.valueOf(workerUploadMetrics.get_allMetrics().get_metrics_size()));
        if (this.topologyMetricContext.readyToUpload()) {
            this.metricLogger.info("all {} worker slots have updated metrics, start merging & uploading...", Integer.valueOf(this.topologyMetricContext.getWorkerNum()));
            this.uploadMetricsExecutor.submit(new Runnable() { // from class: com.alibaba.jstorm.task.master.TopologyMaster.2
                @Override // java.lang.Runnable
                public void run() {
                    TopologyMaster.this.mergeAndUpload();
                }
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void mergeAndUpload() {
        if (this.topologyMetricContext.getUploadedWorkerNum() > 0) {
            TopologyMetric mergeMetrics = this.topologyMetricContext.mergeMetrics();
            if (mergeMetrics != null) {
                uploadMetrics(mergeMetrics);
            }
            this.topologyMetricContext.resetUploadedMetrics();
        }
    }

    private void uploadMetrics(TopologyMetric topologyMetric) {
        long currentTimeMillis = System.currentTimeMillis();
        if (StormConfig.local_mode(this.conf)) {
            return;
        }
        ThriftClient thriftClient = null;
        try {
            try {
                NimbusClient configuredClient = NimbusClient.getConfiguredClient(this.conf);
                Nimbus.Client client = configuredClient.getClient();
                MetricInfo metricInfo = topologyMetric.get_topologyMetric();
                MetricInfo metricInfo2 = topologyMetric.get_componentMetric();
                MetricInfo metricInfo3 = topologyMetric.get_taskMetric();
                MetricInfo metricInfo4 = topologyMetric.get_streamMetric();
                MetricInfo metricInfo5 = topologyMetric.get_workerMetric();
                MetricInfo metricInfo6 = topologyMetric.get_nettyMetric();
                if (metricInfo.get_metrics_size() + metricInfo2.get_metrics_size() + metricInfo3.get_metrics_size() + metricInfo4.get_metrics_size() + metricInfo5.get_metrics_size() + metricInfo6.get_metrics_size() < 10000) {
                    client.uploadTopologyMetrics(this.topologyId, new TopologyMetric(metricInfo, metricInfo2, metricInfo5, metricInfo3, metricInfo4, metricInfo6));
                } else {
                    client.uploadTopologyMetrics(this.topologyId, new TopologyMetric(metricInfo, metricInfo2, this.dummy, this.dummy, this.dummy, this.dummy));
                    batchUploadMetrics(client, this.topologyId, metricInfo5, MetaType.WORKER);
                    batchUploadMetrics(client, this.topologyId, metricInfo3, MetaType.TASK);
                    batchUploadMetrics(client, this.topologyId, metricInfo4, MetaType.STREAM);
                    batchUploadMetrics(client, this.topologyId, metricInfo6, MetaType.NETTY);
                }
                if (configuredClient != null) {
                    configuredClient.close();
                }
            } catch (Exception e) {
                LOG.error("Failed to upload worker metrics", e);
                if (0 != 0) {
                    thriftClient.close();
                }
            }
            this.metricLogger.info("upload metrics, cost:{}", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
        } catch (Throwable th) {
            if (0 != 0) {
                thriftClient.close();
            }
            throw th;
        }
    }

    private void batchUploadMetrics(Nimbus.Client client, String str, MetricInfo metricInfo, MetaType metaType) {
        if (metricInfo.get_metrics_size() <= 10000) {
            doUpload(client, str, metricInfo, metaType);
            return;
        }
        Map<String, Map<Integer, MetricSnapshot>> map = metricInfo.get_metrics();
        HashMap newHashMapWithExpectedSize = Maps.newHashMapWithExpectedSize(MAX_BATCH_SIZE);
        MetricInfo metricInfo2 = new MetricInfo();
        int i = 0;
        for (Map.Entry<String, Map<Integer, MetricSnapshot>> entry : map.entrySet()) {
            newHashMapWithExpectedSize.put(entry.getKey(), entry.getValue());
            i++;
            if (i >= 10000) {
                metricInfo2.set_metrics(newHashMapWithExpectedSize);
                doUpload(client, str, metricInfo2, metaType);
                i = 0;
                newHashMapWithExpectedSize.clear();
            }
        }
        if (newHashMapWithExpectedSize.size() > 0) {
            metricInfo2.set_metrics(newHashMapWithExpectedSize);
            doUpload(client, str, metricInfo2, metaType);
        }
    }

    private void doUpload(Nimbus.Client client, String str, MetricInfo metricInfo, MetaType metaType) {
        try {
            if (metaType == MetaType.TASK) {
                client.uploadTopologyMetrics(str, new TopologyMetric(this.dummy, this.dummy, this.dummy, metricInfo, this.dummy, this.dummy));
            } else if (metaType == MetaType.STREAM) {
                client.uploadTopologyMetrics(str, new TopologyMetric(this.dummy, this.dummy, this.dummy, this.dummy, metricInfo, this.dummy));
            } else if (metaType == MetaType.WORKER) {
                client.uploadTopologyMetrics(str, new TopologyMetric(this.dummy, this.dummy, metricInfo, this.dummy, this.dummy, this.dummy));
            } else if (metaType == MetaType.NETTY) {
                client.uploadTopologyMetrics(str, new TopologyMetric(this.dummy, this.dummy, this.dummy, this.dummy, this.dummy, metricInfo));
            }
        } catch (Exception e) {
            LOG.error("Error", e);
        }
    }
}
