package com.alibaba.jstorm.daemon.nimbus;

import backtype.storm.Config;
import backtype.storm.generated.Nimbus;
import backtype.storm.generated.TopologyMetric;
import backtype.storm.scheduler.INimbus;
import backtype.storm.utils.Utils;
import com.alibaba.jstorm.callback.AsyncLoopRunnable;
import com.alibaba.jstorm.callback.AsyncLoopThread;
import com.alibaba.jstorm.callback.Callback;
import com.alibaba.jstorm.callback.RunnableCallback;
import com.alibaba.jstorm.client.ConfigExtension;
import com.alibaba.jstorm.cluster.StormConfig;
import com.alibaba.jstorm.config.RefreshableComponents;
import com.alibaba.jstorm.daemon.nimbus.metric.ClusterMetricsRunnable;
import com.alibaba.jstorm.daemon.supervisor.Httpserver;
import com.alibaba.jstorm.daemon.worker.hearbeat.SyncContainerHb;
import com.alibaba.jstorm.schedule.CleanRunnable;
import com.alibaba.jstorm.schedule.FollowerRunnable;
import com.alibaba.jstorm.schedule.MonitorRunnable;
import com.alibaba.jstorm.utils.DefaultUncaughtExceptionHandler;
import com.alibaba.jstorm.utils.JStormServerUtils;
import com.alibaba.jstorm.utils.JStormUtils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import shade.storm.org.apache.thrift.TException;
import shade.storm.org.apache.thrift.protocol.TBinaryProtocol;
import shade.storm.org.apache.thrift.server.THsHaServer;
import shade.storm.org.apache.thrift.transport.TNonblockingServerSocket;
import shade.storm.org.apache.thrift.transport.TTransportException;

/* loaded from: input_file:com/alibaba/jstorm/daemon/nimbus/NimbusServer.class */
public class NimbusServer {
    private static final Logger LOG = LoggerFactory.getLogger(NimbusServer.class);
    private NimbusData data;
    private ServiceHandler serviceHandler;
    private TopologyAssign topologyAssign;
    private THsHaServer thriftServer;
    private FollowerRunnable follower;
    private Httpserver hs;
    private List<AsyncLoopThread> smartThreads = new ArrayList();

    public static void main(String[] strArr) throws Exception {
        Thread.setDefaultUncaughtExceptionHandler(new DefaultUncaughtExceptionHandler());
        Map readStormConfig = Utils.readStormConfig();
        JStormServerUtils.startTaobaoJvmMonitor();
        new NimbusServer().launchServer(readStormConfig, new DefaultInimbus());
    }

    private void createPid(Map map) throws Exception {
        JStormServerUtils.createPid(StormConfig.masterPids(map));
    }

    private void launchServer(Map map, INimbus iNimbus) {
        LOG.info("Begin to start nimbus with conf " + map);
        try {
            try {
                StormConfig.validate_distributed_mode(map);
                createPid(map);
                initShutdownHook();
                iNimbus.prepare(map, StormConfig.masterInimbus(map));
                this.data = createNimbusData(map, iNimbus);
                initFollowerThread(map);
                this.hs = new Httpserver(ConfigExtension.getNimbusDeamonHttpserverPort(map).intValue(), map);
                this.hs.start();
                initContainerHBThread(map);
                this.serviceHandler = new ServiceHandler(this.data);
                initThrift(map);
                cleanup();
            } catch (Throwable th) {
                if (th instanceof OutOfMemoryError) {
                    LOG.error("Halting due to Out Of Memory Error...");
                }
                LOG.error("Fail to run nimbus ", th);
                cleanup();
            }
            LOG.info("Quit nimbus");
        } catch (Throwable th2) {
            cleanup();
            throw th2;
        }
    }

    private void mkRefreshConfThread(final NimbusData nimbusData) {
        nimbusData.getScheduExec().scheduleAtFixedRate(new RunnableCallback() { // from class: com.alibaba.jstorm.daemon.nimbus.NimbusServer.1
            @Override // com.alibaba.jstorm.callback.RunnableCallback, java.lang.Runnable
            public void run() {
                NimbusServer.LOG.debug("checking changes in storm.yaml...");
                Map<? extends Object, ? extends Object> readStormConfig = Utils.readStormConfig();
                if (!Utils.isConfigChanged(nimbusData.getConf(), readStormConfig)) {
                    NimbusServer.LOG.debug("no changes detected, stay put.");
                    return;
                }
                NimbusServer.LOG.warn("detected changes in storm.yaml, updating...");
                synchronized (nimbusData.getConf()) {
                    nimbusData.getConf().clear();
                    nimbusData.getConf().putAll(readStormConfig);
                }
                RefreshableComponents.refresh(readStormConfig);
            }

            @Override // com.alibaba.jstorm.callback.RunnableCallback
            public Object getResult() {
                return 15;
            }
        }, 15L, 15L, TimeUnit.SECONDS);
        LOG.info("Successfully init configuration refresh thread");
    }

    public ServiceHandler launcherLocalServer(Map map, INimbus iNimbus) throws Exception {
        LOG.info("Begin to start nimbus on local model");
        StormConfig.validate_local_mode(map);
        iNimbus.prepare(map, StormConfig.masterInimbus(map));
        this.data = createNimbusData(map, iNimbus);
        init(map);
        this.serviceHandler = new ServiceHandler(this.data);
        return this.serviceHandler;
    }

    private void initContainerHBThread(Map map) throws IOException {
        AsyncLoopThread mkNimbusInstance = SyncContainerHb.mkNimbusInstance(map);
        if (mkNimbusInstance != null) {
            this.smartThreads.add(mkNimbusInstance);
        }
    }

    private void initMetricRunnable() {
        this.smartThreads.add(new AsyncLoopThread(ClusterMetricsRunnable.getInstance()));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void init(Map map) throws Exception {
        this.data.init();
        NimbusUtils.cleanupCorruptTopologies(this.data);
        initTopologyAssign();
        initTopologyStatus();
        initCleaner(map);
        initMetricRunnable();
        if (this.data.isLocalMode()) {
            return;
        }
        initMonitor(map);
    }

    private NimbusData createNimbusData(Map map, INimbus iNimbus) throws Exception {
        return new NimbusData(map, iNimbus);
    }

    private void initTopologyAssign() {
        this.topologyAssign = TopologyAssign.getInstance();
        this.topologyAssign.init(this.data);
    }

    private void initTopologyStatus() throws Exception {
        List<String> active_storms = this.data.getStormClusterState().active_storms();
        if (active_storms != null) {
            for (String str : active_storms) {
                NimbusUtils.transition(this.data, str, false, StatusType.startup, new Object[0]);
                NimbusUtils.updateTopologyTaskTimeout(this.data, str);
                NimbusUtils.updateTopologyTaskHb(this.data, str);
            }
        }
        LOG.info("Successfully init topology status");
    }

    private void initMonitor(Map map) {
        ScheduledExecutorService scheduExec = this.data.getScheduExec();
        if (this.data.isLaunchedMonitor()) {
            LOG.info("We have launched Monitor thread before");
            return;
        }
        scheduExec.scheduleAtFixedRate(new MonitorRunnable(this.data), 0L, JStormUtils.parseInt(map.get(Config.NIMBUS_MONITOR_FREQ_SECS), 10).intValue(), TimeUnit.SECONDS);
        this.data.setLaunchedMonitor(true);
        LOG.info("Successfully init Monitor thread");
    }

    private void initCleaner(Map map) throws IOException {
        ScheduledExecutorService scheduExec = this.data.getScheduExec();
        if (this.data.isLaunchedCleaner()) {
            LOG.info("We have launched Cleaner thread before");
            return;
        }
        String masterInbox = StormConfig.masterInbox(map);
        scheduExec.scheduleAtFixedRate(new CleanRunnable(masterInbox, JStormUtils.parseInt(map.get(Config.NIMBUS_INBOX_JAR_EXPIRATION_SECS), JStormUtils.HOUR_1).intValue()), 0L, JStormUtils.parseInt(map.get(Config.NIMBUS_CLEANUP_INBOX_FREQ_SECS), 600).intValue(), TimeUnit.SECONDS);
        this.data.setLaunchedCleaner(true);
        LOG.info("Successfully init " + masterInbox + " cleaner");
    }

    private void initThrift(Map map) throws TTransportException {
        TNonblockingServerSocket tNonblockingServerSocket = new TNonblockingServerSocket(JStormUtils.parseInt(map.get(Config.NIMBUS_THRIFT_PORT)).intValue());
        Integer parseInt = JStormUtils.parseInt(map.get(Config.NIMBUS_THRIFT_MAX_BUFFER_SIZE));
        THsHaServer.Args args = new THsHaServer.Args(tNonblockingServerSocket);
        args.workerThreads(64);
        args.protocolFactory(new TBinaryProtocol.Factory(false, true, parseInt.intValue(), -1L));
        args.processor(new Nimbus.Processor(this.serviceHandler));
        args.maxReadBufferBytes = parseInt.intValue();
        this.thriftServer = new THsHaServer(args);
        LOG.info("Successfully started nimbus: started Thrift server...");
        this.thriftServer.serve();
    }

    private void initFollowerThread(Map map) {
        this.follower = new FollowerRunnable(this.data, 5000, new Callback() { // from class: com.alibaba.jstorm.daemon.nimbus.NimbusServer.2
            @Override // com.alibaba.jstorm.callback.Callback
            public <T> Object execute(T... tArr) {
                try {
                    NimbusServer.this.init(NimbusServer.this.data.getConf());
                    return null;
                } catch (Exception e) {
                    NimbusServer.LOG.error("Nimbus init error after becoming a leader", e);
                    throw new RuntimeException(e);
                }
            }
        });
        Thread thread = new Thread(this.follower);
        thread.setDaemon(true);
        thread.start();
        LOG.info("Successfully init Follower thread");
    }

    private void initShutdownHook() {
        Runtime.getRuntime().addShutdownHook(new Thread() { // from class: com.alibaba.jstorm.daemon.nimbus.NimbusServer.3
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                NimbusServer.this.cleanup();
            }
        });
    }

    public void cleanup() {
        if (this.data == null || this.data.getIsShutdown().getAndSet(true)) {
            LOG.info("Notify to quit nimbus");
            return;
        }
        LOG.info("Begin to shutdown nimbus");
        AsyncLoopRunnable.getShutdown().set(true);
        this.data.getScheduExec().shutdownNow();
        for (AsyncLoopThread asyncLoopThread : this.smartThreads) {
            asyncLoopThread.cleanup();
            JStormUtils.sleepMs(10L);
            asyncLoopThread.interrupt();
            LOG.info("Successfully cleanup " + asyncLoopThread.getThread().getName());
        }
        if (this.serviceHandler != null) {
            this.serviceHandler.shutdown();
        }
        if (this.topologyAssign != null) {
            this.topologyAssign.cleanup();
            LOG.info("Successfully shutdown TopologyAssign thread");
        }
        if (this.follower != null) {
            this.follower.clean();
            LOG.info("Successfully shutdown follower thread");
        }
        if (this.data != null) {
            this.data.cleanup();
            LOG.info("Successfully shutdown NimbusData");
        }
        if (this.thriftServer != null) {
            this.thriftServer.stop();
            LOG.info("Successfully shutdown thrift server");
        }
        if (this.hs != null) {
            this.hs.shutdown();
            LOG.info("Successfully shutdown httpserver");
        }
        LOG.info("Successfully shutdown nimbus");
        JStormUtils.halt_process(0, "!!!Shutdown!!!");
    }

    public void uploadMetrics(String str, TopologyMetric topologyMetric) throws TException {
        this.serviceHandler.uploadTopologyMetrics(str, topologyMetric);
    }

    public Map<String, Long> registerMetrics(String str, Set<String> set) throws TException {
        return this.serviceHandler.registerMetrics(str, set);
    }
}
