/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.jstorm.daemon.nimbus;

import backtype.storm.generated.Nimbus;
import backtype.storm.scheduler.INimbus;
import backtype.storm.utils.Utils;
import com.alibaba.jstorm.callback.AsyncLoopRunnable;
import com.alibaba.jstorm.callback.AsyncLoopThread;
import com.alibaba.jstorm.client.ConfigExtension;
import com.alibaba.jstorm.cluster.StormConfig;
import com.alibaba.jstorm.daemon.nimbus.DefaultInimbus;
import com.alibaba.jstorm.daemon.nimbus.NimbusData;
import com.alibaba.jstorm.daemon.nimbus.NimbusUtils;
import com.alibaba.jstorm.daemon.nimbus.ServiceHandler;
import com.alibaba.jstorm.daemon.nimbus.StatusType;
import com.alibaba.jstorm.daemon.nimbus.TopologyAssign;
import com.alibaba.jstorm.daemon.supervisor.Httpserver;
import com.alibaba.jstorm.daemon.worker.hearbeat.SyncContainerHb;
import com.alibaba.jstorm.schedule.CleanRunnable;
import com.alibaba.jstorm.schedule.FollowerRunnable;
import com.alibaba.jstorm.schedule.MonitorRunnable;
import com.alibaba.jstorm.utils.DefaultUncaughtExceptionHandler;
import com.alibaba.jstorm.utils.JStormServerUtils;
import com.alibaba.jstorm.utils.JStormUtils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.server.THsHaServer;
import org.apache.thrift.transport.TNonblockingServerSocket;
import org.apache.thrift.transport.TNonblockingServerTransport;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NimbusServer {
    private static final Logger LOG = LoggerFactory.getLogger(NimbusServer.class);
    private NimbusData data;
    private ServiceHandler serviceHandler;
    private TopologyAssign topologyAssign;
    private THsHaServer thriftServer;
    private FollowerRunnable follower;
    private Httpserver hs;
    private List<AsyncLoopThread> smartThreads = new ArrayList<AsyncLoopThread>();

    public static void main(String[] args) throws Exception {
        Thread.setDefaultUncaughtExceptionHandler(new DefaultUncaughtExceptionHandler());
        Map config = Utils.readStormConfig();
        JStormServerUtils.startTaobaoJvmMonitor();
        NimbusServer instance = new NimbusServer();
        DefaultInimbus iNimbus = new DefaultInimbus();
        instance.launchServer(config, iNimbus);
    }

    private void createPid(Map conf) throws Exception {
        String pidDir = StormConfig.masterPids(conf);
        JStormServerUtils.createPid(pidDir);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void launchServer(Map conf, INimbus inimbus) {
        LOG.info("Begin to start nimbus with conf " + conf);
        try {
            StormConfig.validate_distributed_mode(conf);
            this.createPid(conf);
            this.initShutdownHook();
            inimbus.prepare(conf, StormConfig.masterInimbus(conf));
            this.data = this.createNimbusData(conf, inimbus);
            this.initFollowerThread(conf);
            int port = ConfigExtension.getNimbusDeamonHttpserverPort(conf);
            this.hs = new Httpserver(port, conf);
            this.hs.start();
            this.initContainerHBThread(conf);
            while (!this.data.isLeader()) {
                Utils.sleep(5000L);
            }
            this.init(conf);
        }
        catch (Throwable e) {
            if (e instanceof OutOfMemoryError) {
                LOG.error("Halting due to Out Of Memory Error...");
            }
            LOG.error("Fail to run nimbus ", e);
        }
        finally {
            this.cleanup();
        }
        LOG.info("Quit nimbus");
    }

    public ServiceHandler launcherLocalServer(Map conf, INimbus inimbus) throws Exception {
        LOG.info("Begin to start nimbus on local model");
        StormConfig.validate_local_mode(conf);
        inimbus.prepare(conf, StormConfig.masterInimbus(conf));
        this.data = this.createNimbusData(conf, inimbus);
        this.init(conf);
        return this.serviceHandler;
    }

    private void initContainerHBThread(Map conf) throws IOException {
        AsyncLoopThread thread = SyncContainerHb.mkNimbusInstance(conf);
        if (thread != null) {
            this.smartThreads.add(thread);
        }
    }

    private void init(Map conf) throws Exception {
        NimbusUtils.cleanupCorruptTopologies(this.data);
        this.initTopologyAssign();
        this.initTopologyStatus();
        this.initCleaner(conf);
        this.serviceHandler = new ServiceHandler(this.data);
        if (!this.data.isLocalMode()) {
            this.initMonitor(conf);
            this.initThrift(conf);
        }
    }

    private NimbusData createNimbusData(Map conf, INimbus inimbus) throws Exception {
        return new NimbusData(conf, inimbus);
    }

    private void initTopologyAssign() {
        this.topologyAssign = TopologyAssign.getInstance();
        this.topologyAssign.init(this.data);
    }

    private void initTopologyStatus() throws Exception {
        List<String> active_ids = this.data.getStormClusterState().active_storms();
        if (active_ids != null) {
            for (String topologyid : active_ids) {
                NimbusUtils.transition(this.data, topologyid, false, StatusType.startup, new Object[0]);
                NimbusUtils.updateTopologyTaskTimeout(this.data, topologyid);
                NimbusUtils.updateTopologyTaskHb(this.data, topologyid);
            }
        }
        LOG.info("Successfully init topology status");
    }

    private void initMonitor(Map conf) {
        ScheduledExecutorService scheduExec = this.data.getScheduExec();
        MonitorRunnable r1 = new MonitorRunnable(this.data);
        int monitor_freq_secs = JStormUtils.parseInt(conf.get("nimbus.monitor.freq.secs"), 10);
        scheduExec.scheduleAtFixedRate(r1, 0L, monitor_freq_secs, TimeUnit.SECONDS);
        LOG.info("Successfully init Monitor thread");
    }

    private void initCleaner(Map conf) throws IOException {
        ScheduledExecutorService scheduExec = this.data.getScheduExec();
        String dir_location = StormConfig.masterInbox(conf);
        int inbox_jar_expiration_secs = JStormUtils.parseInt(conf.get("nimbus.inbox.jar.expiration.secs"), 3600);
        CleanRunnable r2 = new CleanRunnable(dir_location, inbox_jar_expiration_secs);
        int cleanup_inbox_freq_secs = JStormUtils.parseInt(conf.get("nimbus.cleanup.inbox.freq.secs"), 600);
        scheduExec.scheduleAtFixedRate(r2, 0L, cleanup_inbox_freq_secs, TimeUnit.SECONDS);
        LOG.info("Successfully init " + dir_location + " cleaner");
    }

    private void initThrift(Map conf) throws TTransportException {
        Integer thrift_port = JStormUtils.parseInt(conf.get("nimbus.thrift.port"));
        TNonblockingServerSocket socket = new TNonblockingServerSocket(thrift_port.intValue());
        Integer maxReadBufSize = JStormUtils.parseInt(conf.get("nimbus.thrift.max_buffer_size"));
        THsHaServer.Args args = new THsHaServer.Args((TNonblockingServerTransport)socket);
        args.workerThreads(64);
        args.protocolFactory((TProtocolFactory)new TBinaryProtocol.Factory(false, true, (long)maxReadBufSize.intValue(), -1L));
        args.processor(new Nimbus.Processor<ServiceHandler>(this.serviceHandler));
        args.maxReadBufferBytes = maxReadBufSize.intValue();
        this.thriftServer = new THsHaServer(args);
        LOG.info("Successfully started nimbus: started Thrift server...");
        this.thriftServer.serve();
    }

    private void initFollowerThread(Map conf) {
        this.follower = new FollowerRunnable(this.data, 5000);
        Thread thread = new Thread(this.follower);
        thread.setDaemon(true);
        thread.start();
        LOG.info("Successfully init Follower thread");
    }

    private void initShutdownHook() {
        Runtime.getRuntime().addShutdownHook(new Thread(){

            @Override
            public void run() {
                NimbusServer.this.cleanup();
            }
        });
    }

    public void cleanup() {
        if (this.data.getIsShutdown().getAndSet(true)) {
            LOG.info("Notify to quit nimbus");
            return;
        }
        LOG.info("Begin to shutdown nimbus");
        AsyncLoopRunnable.getShutdown().set(true);
        this.data.getScheduExec().shutdownNow();
        for (AsyncLoopThread t : this.smartThreads) {
            t.cleanup();
            JStormUtils.sleepMs(10L);
            t.interrupt();
            LOG.info("Successfully cleanup " + t.getThread().getName());
        }
        if (this.serviceHandler != null) {
            this.serviceHandler.shutdown();
        }
        if (this.topologyAssign != null) {
            this.topologyAssign.cleanup();
            LOG.info("Successfully shutdown TopologyAssign thread");
        }
        if (this.follower != null) {
            this.follower.clean();
            LOG.info("Successfully shutdown follower thread");
        }
        if (this.data != null) {
            this.data.cleanup();
            LOG.info("Successfully shutdown NimbusData");
        }
        if (this.thriftServer != null) {
            this.thriftServer.stop();
            LOG.info("Successfully shutdown thrift server");
        }
        if (this.hs != null) {
            this.hs.shutdown();
            LOG.info("Successfully shutdown httpserver");
        }
        LOG.info("Successfully shutdown nimbus");
        JStormUtils.halt_process(0, "!!!Shutdown!!!");
    }
}

