package com.alibaba.jstorm.task.master;

import backtype.storm.task.IBolt;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IDynamicComponent;
import backtype.storm.tuple.Tuple;
import com.alibaba.jstorm.client.ConfigExtension;
import com.alibaba.jstorm.cluster.Common;
import com.alibaba.jstorm.task.master.ctrlevent.CtrlEventDispatcher;
import com.alibaba.jstorm.task.master.ctrlevent.UpdateConfigEvent;
import com.alibaba.jstorm.task.master.heartbeat.TaskHeartbeatUpdater;
import com.alibaba.jstorm.task.master.metrics.MetricRegister;
import com.alibaba.jstorm.task.master.metrics.MetricsMetaBroadcastEvent;
import com.alibaba.jstorm.task.master.metrics.MetricsUpdater;
import com.alibaba.jstorm.task.master.metrics.MetricsUploader;
import com.alibaba.jstorm.task.master.timer.WorkerSetUpdater;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alibaba/jstorm/task/master/TopologyMaster.class */
public class TopologyMaster implements IBolt, IDynamicComponent {
    private static final Logger LOG = LoggerFactory.getLogger(TopologyMaster.class);
    private static final long serialVersionUID = 4690656768333833626L;
    public static final String WORKER_SET_UPDATER_NAME = "worker_set_updater";
    public static final String METRICS_UPLOADER_NAME = "metrics_uploader";
    public static final String MERTRICS_META_BROADCAST = "metrics_meta_broadcast";
    public static final String UPDATE_CONFIG_NAME = "update_config";
    public static final String FIELD_METRIC_WORKER = "worker";
    public static final String FIELD_METRIC_METRICS = "metrics";
    public static final String FIELD_REGISTER_METRICS = "regMetrics";
    public static final String FIELD_REGISTER_METRICS_RESP = "regMetricsResp";
    public static final String FILED_HEARBEAT_EVENT = "hbEvent";
    public static final String FILED_CTRL_EVENT = "ctrlEvent";
    private static int THREAD_POOL_SIZE;
    private TopologyMasterContext tmContext;
    private Map<String, TMHandler> handlers = new ConcurrentHashMap();
    private ScheduledExecutorService threadPools;

    public void createThreadPools(Map map) {
        THREAD_POOL_SIZE = ConfigExtension.getTopologyMasterThreadPoolSize(map);
        this.threadPools = Executors.newScheduledThreadPool(THREAD_POOL_SIZE);
    }

    public void registerHandlers() {
        TaskHeartbeatUpdater taskHeartbeatUpdater = new TaskHeartbeatUpdater();
        taskHeartbeatUpdater.init(this.tmContext);
        this.handlers.put(Common.TOPOLOGY_MASTER_HB_STREAM_ID, taskHeartbeatUpdater);
        this.handlers.put(UPDATE_CONFIG_NAME, taskHeartbeatUpdater);
        MetricsUpdater metricsUpdater = new MetricsUpdater();
        metricsUpdater.init(this.tmContext);
        this.handlers.put(Common.TOPOLOGY_MASTER_METRICS_STREAM_ID, metricsUpdater);
        MetricRegister metricRegister = new MetricRegister();
        metricRegister.init(this.tmContext);
        this.handlers.put(Common.TOPOLOGY_MASTER_REGISTER_METRICS_STREAM_ID, metricRegister);
        this.handlers.put(MERTRICS_META_BROADCAST, metricRegister);
        this.threadPools.scheduleAtFixedRate(new TMEvent(metricRegister, new MetricsMetaBroadcastEvent()), 10L, 15L, TimeUnit.SECONDS);
        MetricsUploader metricsUploader = new MetricsUploader();
        metricsUploader.init(this.tmContext);
        this.handlers.put(METRICS_UPLOADER_NAME, metricsUploader);
        this.threadPools.scheduleAtFixedRate(new TMEvent(metricsUploader, null), 5L, 60L, TimeUnit.SECONDS);
        CtrlEventDispatcher ctrlEventDispatcher = new CtrlEventDispatcher();
        ctrlEventDispatcher.init(this.tmContext);
        this.handlers.put(Common.TOPOLOGY_MASTER_CONTROL_STREAM_ID, ctrlEventDispatcher);
        this.handlers.put(UPDATE_CONFIG_NAME, ctrlEventDispatcher);
        WorkerSetUpdater workerSetUpdater = new WorkerSetUpdater();
        workerSetUpdater.init(this.tmContext);
        this.handlers.put(WORKER_SET_UPDATER_NAME, workerSetUpdater);
        this.threadPools.scheduleAtFixedRate(new TMEvent(workerSetUpdater, null), 10L, 10L, TimeUnit.SECONDS);
    }

    @Override // backtype.storm.task.IBolt
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.tmContext = new TopologyMasterContext(map, topologyContext, outputCollector);
        createThreadPools(map);
        registerHandlers();
    }

    @Override // backtype.storm.task.IBolt
    public void execute(Tuple tuple) {
        TMHandler tMHandler = this.handlers.get(tuple.getSourceStreamId());
        if (tMHandler == null) {
            LOG.error("No handler of " + tuple.getSourceStreamId());
            this.tmContext.getCollector().fail(tuple);
        } else {
            this.threadPools.submit(new TMEvent(tMHandler, tuple));
            this.tmContext.getCollector().ack(tuple);
        }
    }

    @Override // backtype.storm.task.IBolt
    public void cleanup() {
        Iterator<Map.Entry<String, TMHandler>> it = this.handlers.entrySet().iterator();
        while (it.hasNext()) {
            it.next().getValue().cleanup();
        }
        this.handlers.clear();
        this.threadPools.shutdownNow();
        LOG.info("Successfully cleanup topology Master");
    }

    @Override // backtype.storm.topology.IDynamicComponent
    public void update(Map map) {
        LOG.info("Topology master received new conf:" + map);
        TMHandler tMHandler = this.handlers.get(UPDATE_CONFIG_NAME);
        if (tMHandler == null) {
            LOG.error("No handler to handle update config event");
        } else {
            this.threadPools.submit(new TMEvent(tMHandler, new UpdateConfigEvent(map)));
        }
    }
}
