/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.jstorm.daemon.supervisor;

import backtype.storm.messaging.IContext;
import backtype.storm.utils.LocalState;
import backtype.storm.utils.Utils;
import com.alibaba.jstorm.callback.AsyncLoopThread;
import com.alibaba.jstorm.client.ConfigExtension;
import com.alibaba.jstorm.cluster.Cluster;
import com.alibaba.jstorm.cluster.StormClusterState;
import com.alibaba.jstorm.cluster.StormConfig;
import com.alibaba.jstorm.daemon.supervisor.Heartbeat;
import com.alibaba.jstorm.daemon.supervisor.Httpserver;
import com.alibaba.jstorm.daemon.supervisor.MachineCheckStatus;
import com.alibaba.jstorm.daemon.supervisor.SupervisorHealth;
import com.alibaba.jstorm.daemon.supervisor.SupervisorManger;
import com.alibaba.jstorm.daemon.supervisor.SyncProcessEvent;
import com.alibaba.jstorm.daemon.supervisor.SyncSupervisorEvent;
import com.alibaba.jstorm.daemon.worker.WorkerReportError;
import com.alibaba.jstorm.daemon.worker.hearbeat.SyncContainerHb;
import com.alibaba.jstorm.event.EventManagerImp;
import com.alibaba.jstorm.event.EventManagerPusher;
import com.alibaba.jstorm.utils.DefaultUncaughtExceptionHandler;
import com.alibaba.jstorm.utils.JStormServerUtils;
import com.alibaba.jstorm.utils.JStormUtils;
import java.io.File;
import java.util.Map;
import java.util.UUID;
import java.util.Vector;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Supervisor {
    private static Logger LOG = LoggerFactory.getLogger(Supervisor.class);
    volatile MachineCheckStatus checkStatus = new MachineCheckStatus();

    public SupervisorManger mkSupervisor(Map conf, IContext sharedContext) throws Exception {
        LOG.info("Starting Supervisor with conf " + conf);
        String path = StormConfig.supervisorTmpDir(conf);
        FileUtils.cleanDirectory((File)new File(path));
        StormClusterState stormClusterState = Cluster.mk_storm_cluster_state(conf);
        String hostName = JStormServerUtils.getHostName(conf);
        WorkerReportError workerReportError = new WorkerReportError(stormClusterState, hostName);
        LocalState localState = StormConfig.supervisorState(conf);
        String supervisorId = (String)localState.get("supervisor-id");
        if (supervisorId == null) {
            supervisorId = UUID.randomUUID().toString();
            localState.put("supervisor-id", supervisorId);
        }
        localState.remove("local-zk-assignments");
        localState.remove("lcoal-zk-assignment-version");
        Vector<AsyncLoopThread> threads = new Vector<AsyncLoopThread>();
        Heartbeat hb = new Heartbeat(conf, stormClusterState, supervisorId, localState, this.checkStatus);
        hb.update();
        AsyncLoopThread heartbeat = new AsyncLoopThread(hb, false, null, 1, true);
        threads.add(heartbeat);
        AsyncLoopThread syncContainerHbThread = SyncContainerHb.mkSupervisorInstance(conf);
        if (syncContainerHbThread != null) {
            threads.add(syncContainerHbThread);
        }
        ConcurrentHashMap<String, String> workerThreadPids = new ConcurrentHashMap<String, String>();
        SyncProcessEvent syncProcessEvent = new SyncProcessEvent(supervisorId, conf, localState, workerThreadPids, sharedContext, workerReportError);
        EventManagerImp syncSupEventManager = new EventManagerImp();
        AsyncLoopThread syncSupEventThread = new AsyncLoopThread(syncSupEventManager);
        threads.add(syncSupEventThread);
        SyncSupervisorEvent syncSupervisorEvent = new SyncSupervisorEvent(supervisorId, conf, syncSupEventManager, stormClusterState, localState, syncProcessEvent, hb);
        int syncFrequence = JStormUtils.parseInt(conf.get("supervisor.monitor.frequency.secs"));
        EventManagerPusher syncSupervisorPusher = new EventManagerPusher(syncSupEventManager, syncSupervisorEvent, syncFrequence);
        AsyncLoopThread syncSupervisorThread = new AsyncLoopThread(syncSupervisorPusher);
        threads.add(syncSupervisorThread);
        Httpserver httpserver = null;
        if (!StormConfig.local_mode(conf)) {
            int port = ConfigExtension.getSupervisorDeamonHttpserverPort(conf);
            httpserver = new Httpserver(port, conf);
            httpserver.start();
        }
        if (!StormConfig.local_mode(conf) && ConfigExtension.isEnableCheckSupervisor(conf)) {
            SupervisorHealth supervisorHealth = new SupervisorHealth(conf, this.checkStatus, supervisorId);
            AsyncLoopThread healthThread = new AsyncLoopThread(supervisorHealth, false, null, 1, true);
            threads.add(healthThread);
        }
        return new SupervisorManger(conf, supervisorId, threads, syncSupEventManager, httpserver, stormClusterState, workerThreadPids);
    }

    public void killSupervisor(SupervisorManger supervisor) {
        supervisor.shutdown();
    }

    private void initShutdownHook(SupervisorManger supervisor) {
        Runtime.getRuntime().addShutdownHook(new Thread(supervisor));
    }

    private void createPid(Map conf) throws Exception {
        String pidDir = StormConfig.supervisorPids(conf);
        JStormServerUtils.createPid(pidDir);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void run() {
        try {
            Map conf = Utils.readStormConfig();
            StormConfig.validate_distributed_mode(conf);
            this.createPid(conf);
            SupervisorManger supervisorManager = this.mkSupervisor(conf, null);
            JStormUtils.redirectOutput("/dev/null");
            this.initShutdownHook(supervisorManager);
            while (!supervisorManager.isFinishShutdown()) {
                try {
                    Thread.sleep(1000L);
                }
                catch (InterruptedException interruptedException) {}
            }
        }
        catch (Throwable e) {
            if (e instanceof OutOfMemoryError) {
                LOG.error("Halting due to Out Of Memory Error...");
            }
            LOG.error("Fail to run supervisor ", e);
            System.exit(1);
        }
        finally {
            LOG.info("Shutdown supervisor!!!");
        }
    }

    public static void main(String[] args) {
        Thread.setDefaultUncaughtExceptionHandler(new DefaultUncaughtExceptionHandler());
        JStormServerUtils.startTaobaoJvmMonitor();
        Supervisor instance = new Supervisor();
        instance.run();
    }
}

