/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.jstorm.task.master;

import backtype.storm.generated.MetricInfo;
import backtype.storm.generated.MetricSnapshot;
import backtype.storm.generated.Nimbus;
import backtype.storm.generated.TopologyMetric;
import backtype.storm.generated.WorkerUploadMetrics;
import backtype.storm.task.IBolt;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IDynamicComponent;
import backtype.storm.tuple.Tuple;
import backtype.storm.utils.NimbusClient;
import com.alibaba.jstorm.cluster.StormClusterState;
import com.alibaba.jstorm.cluster.StormConfig;
import com.alibaba.jstorm.metric.MetaType;
import com.alibaba.jstorm.metric.MetricUtils;
import com.alibaba.jstorm.metric.TopologyMetricContext;
import com.alibaba.jstorm.schedule.Assignment;
import com.alibaba.jstorm.schedule.default_assign.ResourceWorkerSlot;
import com.alibaba.jstorm.task.backpressure.BackpressureCoordinator;
import com.alibaba.jstorm.task.heartbeat.TaskHeartbeatUpdater;
import com.alibaba.jstorm.utils.IntervalCheck;
import com.alibaba.jstorm.utils.JStormUtils;
import com.alibaba.jstorm.utils.TimeUtils;
import com.google.common.collect.Maps;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TopologyMaster
implements IBolt,
IDynamicComponent {
    private static final long serialVersionUID = 4690656768333833626L;
    private static final Logger LOG = LoggerFactory.getLogger(TopologyMaster.class);
    private final Logger metricLogger = LoggerFactory.getLogger(TopologyMetricContext.class);
    public static final int MAX_BATCH_SIZE = 10000;
    private final MetricInfo dummy = MetricUtils.mkMetricInfo();
    public static final String FIELD_METRIC_WORKER = "worker";
    public static final String FIELD_METRIC_METRICS = "metrics";
    public static final String FILED_HEARBEAT_EVENT = "hbEvent";
    public static final String FILED_CTRL_EVENT = "ctrlEvent";
    private Map conf;
    private StormClusterState zkCluster;
    private OutputCollector collector;
    private int taskId;
    private String topologyId;
    private volatile Set<ResourceWorkerSlot> workerSet;
    private IntervalCheck intervalCheck;
    private TaskHeartbeatUpdater taskHeartbeatUpdater;
    private BackpressureCoordinator backpressureCoordinator;
    private TopologyMetricContext topologyMetricContext;
    private ScheduledExecutorService uploadMetricsExecutor;
    private Thread updateThread;
    private BlockingQueue<Tuple> queue = new LinkedBlockingDeque<Tuple>();
    private IntervalCheck threadAliveCheck;
    private volatile boolean isActive = true;

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.conf = context.getStormConf();
        this.collector = collector;
        this.taskId = context.getThisTaskId();
        this.topologyId = context.getTopologyId();
        this.zkCluster = context.getZkCluster();
        try {
            Assignment assignment = this.zkCluster.assignment_info(this.topologyId, null);
            this.workerSet = assignment.getWorkers();
            this.intervalCheck = new IntervalCheck();
            this.intervalCheck.setInterval(10L);
            this.intervalCheck.start();
        }
        catch (Exception e) {
            LOG.error("Failed to get assignment for " + this.topologyId);
        }
        this.taskHeartbeatUpdater = new TaskHeartbeatUpdater(this.conf, this.topologyId, this.taskId, this.zkCluster);
        this.backpressureCoordinator = new BackpressureCoordinator(collector, context, this.taskId);
        this.topologyMetricContext = new TopologyMetricContext(this.topologyId, this.workerSet, this.conf);
        this.uploadMetricsExecutor = Executors.newSingleThreadScheduledExecutor();
        this.uploadMetricsExecutor.scheduleAtFixedRate(new Runnable(){

            @Override
            public void run() {
                int offset;
                int secOffset = TimeUtils.secOffset();
                if (secOffset < (offset = 35)) {
                    JStormUtils.sleepMs((offset - secOffset) * 1000);
                } else if (secOffset != offset) {
                    JStormUtils.sleepMs((60 - secOffset + offset) * 1000);
                }
                if (TopologyMaster.this.topologyMetricContext.getUploadedWorkerNum() > 0) {
                    TopologyMaster.this.metricLogger.info("force upload metrics.");
                    TopologyMaster.this.mergeAndUpload();
                }
            }
        }, 5L, 60L, TimeUnit.SECONDS);
        this.updateThread = new Thread(new TopologyMasterRunnable());
        this.updateThread.start();
        this.threadAliveCheck = new IntervalCheck();
        this.threadAliveCheck.setInterval(30L);
        this.threadAliveCheck.start();
    }

    @Override
    public void execute(Tuple input) {
        if (input != null) {
            try {
                this.queue.put(input);
            }
            catch (InterruptedException e) {
                LOG.error("Failed to put event to taskHb updater's queue", (Throwable)e);
            }
            if (this.threadAliveCheck.check() && (this.updateThread == null || !this.updateThread.isAlive())) {
                this.updateThread = new Thread(new TopologyMasterRunnable());
                this.updateThread.start();
            }
            this.collector.ack(input);
        } else {
            LOG.error("Received null tuple!");
        }
    }

    @Override
    public void cleanup() {
        this.isActive = false;
        LOG.info("Successfully cleanup");
    }

    private void updateTopologyWorkerSet() {
        if (this.intervalCheck.check()) {
            try {
                Assignment assignment = this.zkCluster.assignment_info(this.topologyId, null);
                this.workerSet = assignment.getWorkers();
            }
            catch (Exception e) {
                LOG.error("Failed to get assignment for " + this.topologyId);
            }
        }
    }

    private void eventHandle(Tuple input) {
        this.updateTopologyWorkerSet();
        String stream = input.getSourceStreamId();
        try {
            if (stream.equals("__master_task_heartbeat")) {
                this.taskHeartbeatUpdater.process(input);
            } else if (stream.equals("__master_metrics")) {
                this.updateMetrics(input);
            } else if (stream.equals("__master_control_stream")) {
                this.backpressureCoordinator.process(input);
            }
        }
        catch (Exception e) {
            LOG.error("Failed to handle event: " + input.toString(), (Throwable)e);
        }
    }

    @Override
    public void update(Map conf) {
        LOG.info("Topology master received new conf:" + conf);
        if (this.backpressureCoordinator.isBackpressureConfigChange(conf)) {
            this.backpressureCoordinator.updateBackpressureConfig(conf);
        }
    }

    private void updateMetrics(Tuple input) {
        String workerSlot = (String)input.getValueByField(FIELD_METRIC_WORKER);
        WorkerUploadMetrics metrics = (WorkerUploadMetrics)input.getValueByField(FIELD_METRIC_METRICS);
        this.topologyMetricContext.addToMemCache(workerSlot, metrics.get_allMetrics());
        this.metricLogger.info("received metrics from:{}, size:{}", (Object)workerSlot, (Object)metrics.get_allMetrics().get_metrics_size());
        if (this.topologyMetricContext.readyToUpload()) {
            this.metricLogger.info("all {} worker slots have updated metrics, start merging & uploading...", (Object)this.topologyMetricContext.getWorkerNum());
            this.uploadMetricsExecutor.submit(new Runnable(){

                @Override
                public void run() {
                    TopologyMaster.this.mergeAndUpload();
                }
            });
        }
    }

    private void mergeAndUpload() {
        if (this.topologyMetricContext.getUploadedWorkerNum() > 0) {
            TopologyMetric tpMetric = this.topologyMetricContext.mergeMetrics();
            if (tpMetric != null) {
                this.uploadMetrics(tpMetric);
            }
            this.topologyMetricContext.resetUploadedMetrics();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void uploadMetrics(TopologyMetric tpMetric) {
        long start = System.currentTimeMillis();
        if (StormConfig.local_mode(this.conf)) {
            return;
        }
        try (NimbusClient client = null;){
            client = NimbusClient.getConfiguredClient(this.conf);
            Nimbus.Client client1 = client.getClient();
            MetricInfo topologyMetrics = tpMetric.get_topologyMetric();
            MetricInfo componentMetrics = tpMetric.get_componentMetric();
            MetricInfo taskMetrics = tpMetric.get_taskMetric();
            MetricInfo streamMetrics = tpMetric.get_streamMetric();
            MetricInfo workerMetrics = tpMetric.get_workerMetric();
            MetricInfo nettyMetrics = tpMetric.get_nettyMetric();
            int totalSize = topologyMetrics.get_metrics_size() + componentMetrics.get_metrics_size() + taskMetrics.get_metrics_size() + streamMetrics.get_metrics_size() + workerMetrics.get_metrics_size() + nettyMetrics.get_metrics_size();
            if (totalSize < 10000) {
                client1.uploadTopologyMetrics(this.topologyId, new TopologyMetric(topologyMetrics, componentMetrics, workerMetrics, taskMetrics, streamMetrics, nettyMetrics));
            } else {
                client1.uploadTopologyMetrics(this.topologyId, new TopologyMetric(topologyMetrics, componentMetrics, this.dummy, this.dummy, this.dummy, this.dummy));
                this.batchUploadMetrics(client1, this.topologyId, workerMetrics, MetaType.WORKER);
                this.batchUploadMetrics(client1, this.topologyId, taskMetrics, MetaType.TASK);
                this.batchUploadMetrics(client1, this.topologyId, streamMetrics, MetaType.STREAM);
                this.batchUploadMetrics(client1, this.topologyId, nettyMetrics, MetaType.NETTY);
            }
        }
        this.metricLogger.info("upload metrics, cost:{}", (Object)(System.currentTimeMillis() - start));
    }

    private void batchUploadMetrics(Nimbus.Client client, String topologyId, MetricInfo metricInfo, MetaType metaType) {
        if (metricInfo.get_metrics_size() > 10000) {
            Map<String, Map<Integer, MetricSnapshot>> data = metricInfo.get_metrics();
            HashMap part = Maps.newHashMapWithExpectedSize((int)10000);
            MetricInfo uploadPart = new MetricInfo();
            int i = 0;
            for (Map.Entry<String, Map<Integer, MetricSnapshot>> entry : data.entrySet()) {
                part.put(entry.getKey(), entry.getValue());
                if (++i < 10000) continue;
                uploadPart.set_metrics(part);
                this.doUpload(client, topologyId, uploadPart, metaType);
                i = 0;
                part.clear();
            }
            if (part.size() > 0) {
                uploadPart.set_metrics(part);
                this.doUpload(client, topologyId, uploadPart, metaType);
            }
        } else {
            this.doUpload(client, topologyId, metricInfo, metaType);
        }
    }

    private void doUpload(Nimbus.Client client, String topologyId, MetricInfo part, MetaType metaType) {
        try {
            if (metaType == MetaType.TASK) {
                client.uploadTopologyMetrics(topologyId, new TopologyMetric(this.dummy, this.dummy, this.dummy, part, this.dummy, this.dummy));
            } else if (metaType == MetaType.STREAM) {
                client.uploadTopologyMetrics(topologyId, new TopologyMetric(this.dummy, this.dummy, this.dummy, this.dummy, part, this.dummy));
            } else if (metaType == MetaType.WORKER) {
                client.uploadTopologyMetrics(topologyId, new TopologyMetric(this.dummy, this.dummy, part, this.dummy, this.dummy, this.dummy));
            } else if (metaType == MetaType.NETTY) {
                client.uploadTopologyMetrics(topologyId, new TopologyMetric(this.dummy, this.dummy, this.dummy, this.dummy, this.dummy, part));
            }
        }
        catch (Exception ex) {
            LOG.error("Error", (Throwable)ex);
        }
    }

    private class TopologyMasterRunnable
    implements Runnable {
        private TopologyMasterRunnable() {
        }

        @Override
        public void run() {
            while (TopologyMaster.this.isActive) {
                try {
                    Tuple event = (Tuple)TopologyMaster.this.queue.take();
                    if (event == null) continue;
                    TopologyMaster.this.eventHandle(event);
                }
                catch (Throwable e) {
                    LOG.error("Failed to process event", e);
                }
            }
        }
    }
}

