/*
 * Decompiled with CFR 0.152.
 */
package backtype.storm;

import backtype.storm.LocalClusterMap;
import backtype.storm.messaging.IContext;
import backtype.storm.utils.Utils;
import com.alibaba.jstorm.client.ConfigExtension;
import com.alibaba.jstorm.daemon.nimbus.DefaultInimbus;
import com.alibaba.jstorm.daemon.nimbus.NimbusServer;
import com.alibaba.jstorm.daemon.supervisor.Supervisor;
import com.alibaba.jstorm.message.netty.NettyContext;
import com.alibaba.jstorm.utils.JStormUtils;
import com.alibaba.jstorm.zk.Factory;
import com.alibaba.jstorm.zk.Zookeeper;
import java.io.File;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LocalUtils {
    public static Logger LOG = LoggerFactory.getLogger(LocalUtils.class);

    public static LocalClusterMap prepareLocalCluster() {
        LocalClusterMap state = new LocalClusterMap();
        try {
            ArrayList<String> tmpDirs = new ArrayList<String>();
            String zkDir = LocalUtils.getTmpDir();
            tmpDirs.add(zkDir);
            Factory zookeeper = LocalUtils.startLocalZookeeper(zkDir);
            Map conf = LocalUtils.getLocalConf(zookeeper.getZooKeeperServer().getClientPort());
            String nimbusDir = LocalUtils.getTmpDir();
            tmpDirs.add(nimbusDir);
            Map nimbusConf = LocalUtils.deepCopyMap(conf);
            nimbusConf.put("storm.local.dir", nimbusDir);
            NimbusServer instance = new NimbusServer();
            Map supervisorConf = LocalUtils.deepCopyMap(conf);
            String supervisorDir = LocalUtils.getTmpDir();
            tmpDirs.add(supervisorDir);
            supervisorConf.put("storm.local.dir", supervisorDir);
            Supervisor supervisor = new Supervisor();
            IContext context = LocalUtils.getLocalContext(supervisorConf);
            state.setNimbusServer(instance);
            state.setNimbus(instance.launcherLocalServer(nimbusConf, new DefaultInimbus()));
            state.setZookeeper(zookeeper);
            state.setConf(conf);
            state.setTmpDir(tmpDirs);
            state.setSupervisor(supervisor.mkSupervisor(supervisorConf, context));
            return state;
        }
        catch (Exception e) {
            LOG.error("prepare cluster error!", (Throwable)e);
            state.clean();
            return null;
        }
    }

    public static Factory startLocalZookeeper(String tmpDir) {
        for (int i = 2000; i < 65535; ++i) {
            try {
                return Zookeeper.mkInprocessZookeeper(tmpDir, i);
            }
            catch (Exception e) {
                LOG.error("fail to launch zookeeper at port: " + i, (Throwable)e);
                continue;
            }
        }
        throw new RuntimeException("No port is available to launch an inprocess zookeeper.");
    }

    public static String getTmpDir() {
        return System.getProperty("java.io.tmpdir") + File.separator + UUID.randomUUID();
    }

    public static Map getLocalBaseConf() {
        JStormUtils.setLocalMode(true);
        HashMap<String, Object> conf = new HashMap<String, Object>();
        conf.put("storm.cluster.mode", "local");
        conf.put("topology.skip.missing.kryo.registrations", true);
        conf.put("zmq.linger.millis", 0);
        conf.put("topology.enable.message.timeouts", false);
        conf.put("topology.trident.batch.emit.interval.millis", 50);
        ConfigExtension.setSpoutDelayRunSeconds(conf, 0);
        ConfigExtension.setTaskCleanupTimeoutSec(conf, 0);
        ConfigExtension.setTopologyDebugRecvTuple(conf, true);
        conf.put("topology.debug", true);
        conf.put(ConfigExtension.TOPOLOGY_BACKPRESSURE_ENABLE, false);
        return conf;
    }

    public static Map getLocalConf(int port) {
        Map conf = Utils.readStormConfig();
        conf.putAll(LocalUtils.getLocalBaseConf());
        ArrayList<String> zkServers = new ArrayList<String>(1);
        zkServers.add("localhost");
        conf.put("storm.zookeeper.servers", zkServers);
        conf.put("storm.zookeeper.port", port);
        return conf;
    }

    private static IContext getLocalContext(Map conf) {
        if (!((Boolean)conf.get("storm.local.mode.zmq")).booleanValue()) {
            NettyContext result2 = new NettyContext();
            ConfigExtension.setLocalWorkerPort(conf, 6800);
            result2.prepare(conf);
            return result2;
        }
        return null;
    }

    private static Map deepCopyMap(Map map) {
        return new HashMap(map);
    }
}

