/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.jstorm.daemon.supervisor;

import backtype.storm.messaging.IContext;
import backtype.storm.utils.LocalState;
import com.alibaba.jstorm.client.ConfigExtension;
import com.alibaba.jstorm.cluster.Common;
import com.alibaba.jstorm.cluster.StormConfig;
import com.alibaba.jstorm.daemon.supervisor.CgroupManager;
import com.alibaba.jstorm.daemon.supervisor.SandBoxMaker;
import com.alibaba.jstorm.daemon.supervisor.ShutdownWork;
import com.alibaba.jstorm.daemon.supervisor.StateHeartbeat;
import com.alibaba.jstorm.daemon.worker.LocalAssignment;
import com.alibaba.jstorm.daemon.worker.ProcessSimulator;
import com.alibaba.jstorm.daemon.worker.State;
import com.alibaba.jstorm.daemon.worker.Worker;
import com.alibaba.jstorm.daemon.worker.WorkerHeartbeat;
import com.alibaba.jstorm.daemon.worker.WorkerReportError;
import com.alibaba.jstorm.daemon.worker.WorkerShutdown;
import com.alibaba.jstorm.utils.JStormUtils;
import com.alibaba.jstorm.utils.Pair;
import com.alibaba.jstorm.utils.PathUtils;
import com.alibaba.jstorm.utils.ProcessLauncher;
import com.alibaba.jstorm.utils.TimeFormat;
import com.alibaba.jstorm.utils.TimeUtils;
import java.io.File;
import java.io.IOException;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class SyncProcessEvent
extends ShutdownWork {
    private static Logger LOG = LoggerFactory.getLogger(SyncProcessEvent.class);
    private LocalState localState;
    private Map conf;
    private ConcurrentHashMap<String, String> workerThreadPids;
    private String supervisorId;
    private IContext sharedContext;
    private CgroupManager cgroupManager;
    private SandBoxMaker sandBoxMaker;
    private Map<String, Pair<Integer, Integer>> workerIdToStartTimeAndPort;
    private Map<Integer, String> portToWorkerId = new HashMap<Integer, String>();
    private AtomicReference<Set> needDownloadTopologys;
    private Map<String, Integer> killingWorkers;
    private int lastTime;
    private WorkerReportError workerReportError;

    public SyncProcessEvent(String supervisorId, Map conf, LocalState localState, ConcurrentHashMap<String, String> workerThreadPids, IContext sharedContext, WorkerReportError workerReportError) {
        this.supervisorId = supervisorId;
        this.conf = conf;
        this.localState = localState;
        this.workerThreadPids = workerThreadPids;
        this.sharedContext = sharedContext;
        this.sandBoxMaker = new SandBoxMaker(conf);
        this.workerIdToStartTimeAndPort = new HashMap<String, Pair<Integer, Integer>>();
        this.needDownloadTopologys = new AtomicReference();
        if (ConfigExtension.isEnableCgroup(conf)) {
            this.cgroupManager = new CgroupManager(conf);
        }
        this.killingWorkers = new HashMap<String, Integer>();
        this.workerReportError = workerReportError;
    }

    @Override
    public void run() {
    }

    public void run(Map<Integer, LocalAssignment> localAssignments, Set<String> downloadFailedTopologyIds) {
        LOG.debug("Syncing processes, interval seconds:" + TimeUtils.time_delta(this.lastTime));
        this.lastTime = TimeUtils.current_time_secs();
        try {
            Map<String, StateHeartbeat> localWorkerStats;
            if (localAssignments == null) {
                localAssignments = new HashMap<Integer, LocalAssignment>();
            }
            LOG.debug("Assigned tasks: " + localAssignments);
            try {
                localWorkerStats = this.getLocalWorkerStats(this.conf, this.localState, localAssignments);
            }
            catch (Exception e) {
                LOG.error("Failed to get Local worker stats");
                throw e;
            }
            LOG.debug("Allocated: " + localWorkerStats);
            Set<Integer> keepPorts = null;
            try {
                Map taskCleaupTimeoutMap = (Map)this.localState.get("task-cleanup-timeout");
                keepPorts = this.killUselessWorkers(localWorkerStats, localAssignments, taskCleaupTimeoutMap);
                this.localState.put("task-cleanup-timeout", taskCleaupTimeoutMap);
            }
            catch (IOException e) {
                LOG.error("Failed to kill workers", (Throwable)e);
            }
            this.checkNewWorkers(this.conf);
            this.checkNeedUpdateTopologys(localWorkerStats, localAssignments);
            this.startNewWorkers(keepPorts, localAssignments, downloadFailedTopologyIds);
        }
        catch (Exception e) {
            LOG.error("Failed Sync Process", (Throwable)e);
        }
    }

    public void checkNeedUpdateTopologys(Map<String, StateHeartbeat> localWorkerStats, Map<Integer, LocalAssignment> localAssignments) throws Exception {
        HashSet<String> topologies = new HashSet<String>();
        for (Map.Entry<Integer, LocalAssignment> entry : localAssignments.entrySet()) {
            topologies.add(entry.getValue().getTopologyId());
        }
        for (StateHeartbeat stateHb : localWorkerStats.values()) {
            State state = stateHb.getState();
            if (state.equals((Object)State.notStarted)) continue;
            String topologyId = stateHb.getHeartbeat().getTopologyId();
            topologies.remove(topologyId);
        }
        long currTime = System.currentTimeMillis();
        HashSet<String> needRemoveTopologies = new HashSet<String>();
        for (String topologyId : topologies) {
            try {
                long lastModifytime = StormConfig.get_supervisor_topology_Bianrymodify_time(this.conf, topologyId);
                if ((currTime - lastModifytime) / 1000L >= 120L) continue;
                LOG.debug("less 2 miniute ,so removed " + topologyId);
                needRemoveTopologies.add(topologyId);
            }
            catch (Exception e) {
                LOG.error("Failed to get the time of file last modification for topology" + topologyId, (Throwable)e);
                needRemoveTopologies.add(topologyId);
            }
        }
        topologies.removeAll(needRemoveTopologies);
        if (topologies.size() > 0) {
            LOG.debug("Following topologies are going to re-download the jars, " + topologies);
        }
        this.needDownloadTopologys.set(topologies);
    }

    public void markAllNewWorkers(Map<Integer, String> workerIds) {
        int startTime = TimeUtils.current_time_secs();
        for (Map.Entry<Integer, String> entry : workerIds.entrySet()) {
            String oldWorkerIds = this.portToWorkerId.get(entry.getKey());
            if (oldWorkerIds != null) {
                this.workerIdToStartTimeAndPort.remove(oldWorkerIds);
                LOG.info("exit port is still occupied by old workerId, so remove useless " + oldWorkerIds + " form workerIdToStartTimeAndPort");
            }
            this.portToWorkerId.put(entry.getKey(), entry.getValue());
            this.workerIdToStartTimeAndPort.put(entry.getValue(), new Pair<Integer, Integer>(startTime, entry.getKey()));
        }
    }

    public void checkNewWorkers(Map conf) throws IOException, InterruptedException {
        HashSet<String> workers = new HashSet<String>();
        for (Map.Entry<String, Pair<Integer, Integer>> entry : this.workerIdToStartTimeAndPort.entrySet()) {
            String workerId = entry.getKey();
            int startTime = entry.getValue().getFirst();
            LocalState ls = StormConfig.worker_state(conf, workerId);
            WorkerHeartbeat whb = (WorkerHeartbeat)ls.get("worker-heartbeat");
            if (whb == null) {
                if (TimeUtils.current_time_secs() - startTime < JStormUtils.parseInt(conf.get("supervisor.worker.start.timeout.secs"))) {
                    LOG.info(workerId + " still hasn't started");
                    continue;
                }
                LOG.error("Failed to start Worker " + workerId);
                workers.add(workerId);
                continue;
            }
            LOG.info("Successfully start worker " + workerId);
            workers.add(workerId);
        }
        for (String workerId : workers) {
            Integer port = this.workerIdToStartTimeAndPort.get(workerId).getSecond();
            this.workerIdToStartTimeAndPort.remove(workerId);
            this.portToWorkerId.remove(port);
        }
    }

    public Map<Integer, String> getPortToWorkerId() {
        return this.portToWorkerId;
    }

    public Map<String, StateHeartbeat> getLocalWorkerStats(Map conf, LocalState localState, Map<Integer, LocalAssignment> assignedTasks) throws Exception {
        HashMap<String, StateHeartbeat> workerIdHbstate = new HashMap<String, StateHeartbeat>();
        int now = TimeUtils.current_time_secs();
        Map<String, WorkerHeartbeat> idToHeartbeat = this.readWorkerHeartbeats(conf);
        for (Map.Entry<String, WorkerHeartbeat> entry : idToHeartbeat.entrySet()) {
            State state;
            String workerId = entry.getKey();
            WorkerHeartbeat whb = entry.getValue();
            if (whb == null) {
                LocalAssignment localAssignment;
                state = State.notStarted;
                Pair<Integer, Integer> timeToPort = this.workerIdToStartTimeAndPort.get(workerId);
                if (timeToPort != null && (localAssignment = assignedTasks.get(timeToPort.getSecond())) == null) {
                    LOG.info("Following worker don't exit assignment, so remove this port=" + timeToPort.getSecond());
                    state = State.disallowed;
                    Integer port = this.workerIdToStartTimeAndPort.get(workerId).getSecond();
                    this.workerIdToStartTimeAndPort.remove(workerId);
                    this.portToWorkerId.remove(port);
                }
            } else if (!this.matchesAssignment(whb, assignedTasks)) {
                state = State.disallowed;
            } else if (now - whb.getTimeSecs() > JStormUtils.parseInt(conf.get("supervisor.worker.timeout.secs"))) {
                if (!this.killingWorkers.containsKey(workerId)) {
                    String outTimeInfo = " it is likely to be out of memory, the worker is time out ";
                    this.workerReportError.report(whb.getTopologyId(), whb.getPort(), whb.getTaskIds(), outTimeInfo, 501);
                }
                state = State.timedOut;
            } else if (this.isWorkerDead(workerId)) {
                if (!this.killingWorkers.containsKey(workerId)) {
                    String workerDeadInfo = "Worker is dead ";
                    this.workerReportError.report(whb.getTopologyId(), whb.getPort(), whb.getTaskIds(), workerDeadInfo, 500);
                }
                state = State.timedOut;
            } else {
                state = State.valid;
            }
            if (state != State.valid) {
                if (!this.killingWorkers.containsKey(workerId)) {
                    LOG.info("Worker:" + workerId + " state:" + (Object)((Object)state) + " WorkerHeartbeat:" + whb + " assignedTasks:" + assignedTasks + " at supervisor time-secs " + now);
                }
            } else {
                LOG.debug("Worker:" + workerId + " state:" + (Object)((Object)state) + " WorkerHeartbeat: " + whb + " at supervisor time-secs " + now);
            }
            workerIdHbstate.put(workerId, new StateHeartbeat(state, whb));
        }
        return workerIdHbstate;
    }

    public boolean matchesAssignment(WorkerHeartbeat whb, Map<Integer, LocalAssignment> assignedTasks) {
        boolean isMatch = true;
        LocalAssignment localAssignment = assignedTasks.get(whb.getPort());
        if (localAssignment == null) {
            LOG.debug("Following worker has been removed, port=" + whb.getPort() + ", assignedTasks=" + assignedTasks);
            isMatch = false;
        } else if (!whb.getTopologyId().equals(localAssignment.getTopologyId())) {
            LOG.info("topology id not equal whb=" + whb.getTopologyId() + ",localAssignment=" + localAssignment.getTopologyId());
            isMatch = false;
        }
        return isMatch;
    }

    public Map<String, WorkerHeartbeat> readWorkerHeartbeats(Map conf) throws Exception {
        HashMap<String, WorkerHeartbeat> workerHeartbeats = new HashMap<String, WorkerHeartbeat>();
        String path = StormConfig.worker_root(conf);
        List<String> workerIds = PathUtils.read_dir_contents(path);
        if (workerIds == null) {
            LOG.info("No worker dir under " + path);
            return workerHeartbeats;
        }
        for (String workerId : workerIds) {
            WorkerHeartbeat whb = this.readWorkerHeartbeat(conf, workerId);
            workerHeartbeats.put(workerId, whb);
        }
        return workerHeartbeats;
    }

    public WorkerHeartbeat readWorkerHeartbeat(Map conf, String workerId) throws Exception {
        try {
            LocalState ls = StormConfig.worker_state(conf, workerId);
            return (WorkerHeartbeat)ls.get("worker-heartbeat");
        }
        catch (Exception e) {
            LOG.error("Failed to get worker Heartbeat", (Throwable)e);
            return null;
        }
    }

    public void launchWorker(Map conf, IContext sharedcontext, String topologyId, String supervisorId, Integer port, String workerId, ConcurrentHashMap<String, String> workerThreadPidsAtom) throws Exception {
        String pid = UUID.randomUUID().toString();
        WorkerShutdown worker = Worker.mk_worker(conf, sharedcontext, topologyId, supervisorId, port, workerId, null);
        ProcessSimulator.registerProcess(pid, worker);
        workerThreadPidsAtom.put(workerId, pid);
    }

    private Set<String> setFilterJars(Map totalConf) {
        String excludeJars;
        HashSet<String> filterJars = new HashSet<String>();
        boolean enableClassLoader = ConfigExtension.isEnableTopologyClassLoader(totalConf);
        if (!enableClassLoader) {
            boolean enableLog4j = false;
            String userDefLog4jConf = ConfigExtension.getUserDefinedLog4jConf(totalConf);
            if (!StringUtils.isBlank((String)userDefLog4jConf)) {
                enableLog4j = true;
            }
            if (enableLog4j) {
                filterJars.add("log4j-over-slf4j");
                filterJars.add("logback-core");
                filterJars.add("logback-classic");
            } else {
                filterJars.add("slf4j-log4j");
                filterJars.add("log4j");
            }
        }
        if (!StringUtils.isBlank((String)(excludeJars = (String)totalConf.get("exclude.jars")))) {
            String[] jars;
            for (String jar : jars = excludeJars.split(",")) {
                filterJars.add(jar);
            }
        }
        LOG.info("Remove jars " + filterJars);
        return filterJars;
    }

    public static boolean isKeyContain(Collection<String> collection, String jar) {
        if (collection == null) {
            return false;
        }
        File file = new File(jar);
        String fileName = file.getName();
        for (String item : collection) {
            String regex = item + "[-._0-9]*.jar";
            Pattern p = Pattern.compile(regex);
            if (!p.matcher(fileName).matches()) continue;
            return true;
        }
        return false;
    }

    public AtomicReference<Set> getTopologyIdNeedDownload() {
        return this.needDownloadTopologys;
    }

    private String getClassPath(String stormHome, Map totalConf) {
        String classpath = JStormUtils.current_classpath();
        String[] classPaths = classpath.split(":");
        HashSet<String> classSet = new HashSet<String>();
        for (String classJar : classPaths) {
            if (StringUtils.isBlank((String)classJar)) continue;
            classSet.add(classJar);
        }
        if (stormHome != null) {
            List<String> stormHomeFiles = PathUtils.read_dir_contents(stormHome);
            for (String file : stormHomeFiles) {
                if (!file.endsWith(".jar")) continue;
                classSet.add(stormHome + File.separator + file);
            }
            List<String> stormLibFiles = PathUtils.read_dir_contents(stormHome + File.separator + "lib");
            for (String file : stormLibFiles) {
                if (!file.endsWith(".jar")) continue;
                classSet.add(stormHome + File.separator + "lib" + File.separator + file);
            }
        }
        Set<String> filterJars = this.setFilterJars(totalConf);
        StringBuilder sb = new StringBuilder();
        for (String jar : classSet) {
            if (SyncProcessEvent.isKeyContain(filterJars, jar)) {
                LOG.info("Remove " + jar);
                continue;
            }
            sb.append(jar).append(":");
        }
        return sb.toString();
    }

    public String getWorkerClassPath(String stormJar, Map totalConf, String stormRoot) {
        List otherLibs;
        StringBuilder sb = new StringBuilder();
        if (!StringUtils.isBlank((String)((String)totalConf.get("topology.classpath")))) {
            sb.append(totalConf.get("topology.classpath")).append(":");
        }
        if ((otherLibs = (List)totalConf.get("topology.lib.name")) != null) {
            for (String libName : otherLibs) {
                sb.append(StormConfig.stormlib_path(stormRoot, libName)).append(":");
            }
        }
        sb.append(stormJar);
        return sb.toString();
    }

    public String getChildOpts(Map stormConf) {
        String childOpts = " ";
        if (stormConf.get("topology.worker.childopts") != null) {
            childOpts = childOpts + (String)stormConf.get("topology.worker.childopts");
        } else if (ConfigExtension.getWorkerGc(stormConf) != null) {
            childOpts = childOpts + ConfigExtension.getWorkerGc(stormConf);
        }
        return childOpts;
    }

    public String getLogParameter(Map conf, String stormHome, String topologyName, int port) {
        String LOGBACK_CONF_TAG = "logback.configurationFile";
        String LOGBACK_CONF_TAG_CMD = " -Dlogback.configurationFile=";
        String DEFAULT_LOG_CONF = "jstorm.logback.xml";
        String logFileName = JStormUtils.genLogName(topologyName, port);
        StringBuilder commandSB = new StringBuilder();
        String logDir = System.getProperty("jstorm.log.dir");
        if (!StringUtils.isBlank((String)logDir)) {
            commandSB.append(" -Djstorm.log.dir=").append(logDir);
        }
        commandSB.append(" -Dlogfile.name=").append(logFileName);
        commandSB.append(" -Dtopology.name=").append(topologyName);
        String userDefLogbackConf = ConfigExtension.getUserDefinedLogbackConf(conf);
        String logConf = System.getProperty("logback.configurationFile");
        if (!StringUtils.isBlank((String)userDefLogbackConf)) {
            LOG.info("Use user defined logback conf " + userDefLogbackConf);
            commandSB.append(" -Dlogback.configurationFile=").append(userDefLogbackConf);
        } else if (!StringUtils.isBlank((String)logConf)) {
            commandSB.append(" -Dlogback.configurationFile=").append(logConf);
        } else if (!StringUtils.isBlank((String)stormHome)) {
            commandSB.append(" -Dlogback.configurationFile=").append(stormHome).append(File.separator).append("conf").append(File.separator).append("jstorm.logback.xml");
        } else {
            commandSB.append(" -Dlogback.configurationFile=jstorm.logback.xml");
        }
        String LOG4J_CONF_TAG = "log4j.configuration";
        String userDefLog4jConf = ConfigExtension.getUserDefinedLog4jConf(conf);
        if (!StringUtils.isBlank((String)userDefLog4jConf)) {
            LOG.info("Use user defined log4j conf " + userDefLog4jConf);
            commandSB.append(" -Dlog4j.configuration=").append(userDefLog4jConf);
        }
        return commandSB.toString();
    }

    private String getGcDumpParam(String topologyId, int port, Map totalConf) {
        Date now = new Date();
        String nowStr = TimeFormat.getSecond(now);
        String topologyName = Common.getTopologyNameById(topologyId);
        String logPath = JStormUtils.getLogDir();
        String gcPath = PathUtils.join(logPath, topologyName);
        try {
            FileUtils.forceMkdir((File)new File(gcPath));
        }
        catch (Exception exception) {
            // empty catch block
        }
        String gcLogFile = PathUtils.join(logPath, topologyName, topologyName + "-worker-" + port + "-gc.log");
        String dumpFile = PathUtils.join(logPath, topologyName, "java-" + topologyId + "-" + nowStr + ".hprof");
        PathUtils.mv(gcLogFile, gcLogFile + ".old");
        StringBuilder gc = new StringBuilder(256);
        gc.append(" -Xloggc:").append(gcLogFile).append(" -verbose:gc -XX:+PrintGCDateStamps -XX:+PrintGCDetails").append(" -XX:HeapDumpPath=").append(dumpFile).append(" ");
        return gc.toString().replace("/./", "/");
    }

    public String getWorkerMemParameter(LocalAssignment assignment, Map totalConf, String topologyId, Integer port) {
        long memSize = assignment.getMem();
        long memMinSize = ConfigExtension.getMemMinSizePerWorker(totalConf);
        long memGsize = memSize / JStormUtils.SIZE_1_G;
        int gcThreadsNum = memGsize > 4L ? (int)((double)memGsize * 1.5) : 4;
        String childOpts = this.getChildOpts(totalConf);
        childOpts = childOpts + this.getGcDumpParam(topologyId, port, totalConf);
        StringBuilder commandSB = new StringBuilder();
        commandSB.append(" -Xms").append(memMinSize).append(" -Xmx").append(memSize).append(" ");
        if (memMinSize < memSize / 2L) {
            commandSB.append(" -Xmn").append(memMinSize / 2L).append(" ");
        } else {
            commandSB.append(" -Xmn").append(memSize / 2L).append(" ");
        }
        if (memGsize >= 2L) {
            commandSB.append(" -XX:PermSize=").append(memSize / 32L);
        } else {
            commandSB.append(" -XX:PermSize=").append(memSize / 16L);
        }
        commandSB.append(" -XX:MaxPermSize=").append(memSize / 16L);
        commandSB.append(" -XX:ParallelGCThreads=").append(gcThreadsNum);
        commandSB.append(" ").append(childOpts);
        if (!StringUtils.isBlank((String)assignment.getJvm())) {
            commandSB.append(" ").append(assignment.getJvm());
        }
        return commandSB.toString();
    }

    public String getSandBoxParameter(String classpath, String workerClassPath, String workerId) throws IOException {
        HashMap<String, String> policyReplaceMap = new HashMap<String, String>();
        String realClassPath = classpath + ":" + workerClassPath;
        policyReplaceMap.put("%CLASS_PATH%", realClassPath);
        return this.sandBoxMaker.sandboxPolicy(workerId, policyReplaceMap);
    }

    public String getWorkerParameter(LocalAssignment assignment, Map totalConf, String stormHome, String topologyId, String supervisorId, String workerId, Integer port) throws IOException {
        String stormRoot = StormConfig.supervisor_stormdist_root(this.conf, topologyId);
        String stormJar = StormConfig.stormjar_path(stormRoot);
        StringBuilder commandSB = new StringBuilder();
        try {
            if (this.cgroupManager != null) {
                commandSB.append(this.cgroupManager.startNewWorker(totalConf, assignment.getCpu(), workerId));
            }
        }
        catch (Exception e) {
            LOG.error("Failed to prepare cgroup to workerId: " + workerId, (Throwable)e);
            commandSB = new StringBuilder();
        }
        commandSB.append("java -server ");
        commandSB.append(this.getWorkerMemParameter(assignment, totalConf, topologyId, port));
        commandSB.append(" -Djava.library.path=").append((String)totalConf.get("java.library.path"));
        commandSB.append(" -Djstorm.home=").append(stormHome);
        commandSB.append(this.getLogParameter(totalConf, stormHome, assignment.getTopologyName(), port));
        String classpath = this.getClassPath(stormHome, totalConf);
        String workerClassPath = this.getWorkerClassPath(stormJar, totalConf, stormRoot);
        commandSB.append(this.getSandBoxParameter(classpath, workerClassPath, workerId));
        commandSB.append(" -cp ");
        commandSB.append(classpath);
        if (!ConfigExtension.isEnableTopologyClassLoader(totalConf)) {
            commandSB.append(":").append(workerClassPath);
        }
        commandSB.append(" com.alibaba.jstorm.daemon.worker.Worker ");
        commandSB.append(topologyId);
        commandSB.append(" ").append(supervisorId);
        commandSB.append(" ").append(port);
        commandSB.append(" ").append(workerId);
        commandSB.append(" ").append(workerClassPath);
        return commandSB.toString();
    }

    public String getLauncherParameter(LocalAssignment assignment, Map totalConf, String stormHome, String topologyId, int port) throws IOException {
        boolean isEnable = ConfigExtension.isProcessLauncherEnable(totalConf);
        if (!isEnable) {
            return "";
        }
        String stormRoot = StormConfig.supervisor_stormdist_root(this.conf, topologyId);
        String stormJar = StormConfig.stormjar_path(stormRoot);
        StringBuilder sb = new StringBuilder();
        sb.append(" java ");
        sb.append(ConfigExtension.getProcessLauncherChildOpts(totalConf));
        sb.append(this.getLogParameter(totalConf, stormHome, assignment.getTopologyName(), port));
        sb.append(" -cp ");
        sb.append(this.getClassPath(stormHome, totalConf));
        if (!ConfigExtension.isEnableTopologyClassLoader(totalConf)) {
            sb.append(":").append(stormJar);
        }
        sb.append(" ").append(ProcessLauncher.class.getName()).append(" ");
        String launcherCmd = sb.toString();
        if (ConfigExtension.getWorkerRedirectOutput(totalConf)) {
            String outFile = this.getWorkerRedirectOutput(totalConf, assignment, port);
            outFile = "-Dlogfile.name=" + outFile + " ";
            launcherCmd = launcherCmd.replaceAll("-Dlogfile\\.name=.*?\\s", outFile);
        }
        return launcherCmd;
    }

    public void launchWorker(Map conf, IContext sharedContext, String topologyId, String supervisorId, Integer port, String workerId, LocalAssignment assignment) throws IOException {
        Map stormConf = StormConfig.read_supervisor_topology_conf(conf, topologyId);
        String stormHome = System.getProperty("jstorm.home");
        if (StringUtils.isBlank((String)stormHome)) {
            stormHome = "./";
        }
        HashMap totalConf = new HashMap();
        totalConf.putAll(conf);
        totalConf.putAll(stormConf);
        HashMap<String, String> environment = new HashMap<String, String>();
        if (ConfigExtension.getWorkerRedirectOutput(totalConf)) {
            environment.put("REDIRECT", "true");
        } else {
            environment.put("REDIRECT", "false");
        }
        environment.put("LD_LIBRARY_PATH", (String)totalConf.get("java.library.path"));
        environment.put("jstorm.home", stormHome);
        environment.put("jstorm.workerId", workerId);
        String launcherCmd = this.getLauncherParameter(assignment, totalConf, stormHome, topologyId, port);
        String workerCmd = this.getWorkerParameter(assignment, totalConf, stormHome, topologyId, supervisorId, workerId, port);
        String cmd = launcherCmd + " " + workerCmd;
        cmd = cmd.replace("%JSTORM_HOME%", stormHome);
        LOG.info("Launching worker with command: " + cmd);
        LOG.info("Environment:" + ((Object)environment).toString());
        JStormUtils.launchProcess(cmd, environment, true);
    }

    private Set<Integer> killUselessWorkers(Map<String, StateHeartbeat> localWorkerStats, Map<Integer, LocalAssignment> localAssignments, Map<String, Integer> taskCleanupTimeoutMap) {
        HashMap<String, String> removed = new HashMap<String, String>();
        HashSet<Integer> keepPorts = new HashSet<Integer>();
        for (Map.Entry<String, StateHeartbeat> entry : localWorkerStats.entrySet()) {
            String string = entry.getKey();
            StateHeartbeat hbState = entry.getValue();
            if (this.workerIdToStartTimeAndPort.containsKey(string) && hbState.getState().equals((Object)State.notStarted)) continue;
            if (hbState.getState().equals((Object)State.valid)) {
                keepPorts.add(hbState.getHeartbeat().getPort());
                continue;
            }
            if (hbState.getHeartbeat() != null) {
                removed.put(string, hbState.getHeartbeat().getTopologyId());
            } else {
                removed.put(string, null);
            }
            if (this.killingWorkers.containsKey(string)) continue;
            StringBuilder sb = new StringBuilder();
            sb.append("Shutting down and clearing state for id ").append(string);
            sb.append(";State:").append(hbState);
            LOG.info(sb.toString());
        }
        this.shutWorker(this.conf, this.supervisorId, removed, this.workerThreadPids, this.cgroupManager, false, this.killingWorkers, taskCleanupTimeoutMap);
        HashSet<String> activeTopologies = new HashSet<String>();
        if (this.killingWorkers.size() == 0) {
            for (Map.Entry<Integer, LocalAssignment> entry : localAssignments.entrySet()) {
                activeTopologies.add(entry.getValue().getTopologyId());
            }
            HashSet<String> obsoleteTopologys = new HashSet<String>();
            for (String topologyId : taskCleanupTimeoutMap.keySet()) {
                if (activeTopologies.contains(topologyId)) continue;
                obsoleteTopologys.add(topologyId);
            }
            for (String topologyId : obsoleteTopologys) {
                taskCleanupTimeoutMap.remove(topologyId);
            }
        }
        for (String string : removed.keySet()) {
            localWorkerStats.remove(string);
        }
        for (Map.Entry<Object, Object> entry : this.workerIdToStartTimeAndPort.entrySet()) {
            String workerId = (String)entry.getKey();
            StateHeartbeat hbState = localWorkerStats.get(workerId);
            if (hbState == null || !hbState.getState().equals((Object)State.notStarted)) continue;
            keepPorts.add((Integer)((Pair)entry.getValue()).getSecond());
        }
        return keepPorts;
    }

    private String getWorkerRedirectOutput(Map totalConf, LocalAssignment assignment, int port) {
        String DEFAULT_OUT_TARGET_FILE = JStormUtils.genLogName(assignment.getTopologyName(), port);
        DEFAULT_OUT_TARGET_FILE = DEFAULT_OUT_TARGET_FILE == null ? "/dev/null" : DEFAULT_OUT_TARGET_FILE + ".out";
        String outputFile = ConfigExtension.getWorkerRedirectOutputFile(totalConf);
        outputFile = outputFile == null ? DEFAULT_OUT_TARGET_FILE : outputFile + "-" + assignment.getTopologyName() + "-" + port;
        return outputFile;
    }

    private void startNewWorkers(Set<Integer> keepPorts, Map<Integer, LocalAssignment> localAssignments, Set<String> downloadFailedTopologyIds) throws Exception {
        Map<Integer, LocalAssignment> newWorkers = JStormUtils.select_keys_pred(keepPorts, localAssignments);
        HashMap<Integer, String> newWorkerIds = new HashMap<Integer, String>();
        for (Map.Entry<Integer, LocalAssignment> entry : newWorkers.entrySet()) {
            Integer port = entry.getKey();
            LocalAssignment assignment = entry.getValue();
            if (assignment != null && assignment.getTopologyId() != null && downloadFailedTopologyIds.contains(assignment.getTopologyId())) {
                LOG.info("Can't start this worker: " + port + " about the topology: " + assignment.getTopologyId() + ", due to the damaged binary !!");
                continue;
            }
            String workerId = UUID.randomUUID().toString();
            newWorkerIds.put(port, workerId);
            try {
                StormConfig.worker_pids_root(this.conf, workerId);
            }
            catch (IOException e1) {
                LOG.error("Failed to create " + workerId + " localdir", (Throwable)e1);
                throw e1;
            }
            StringBuilder sb = new StringBuilder();
            sb.append("Launching worker with assiangment ");
            sb.append(assignment).append(" for the supervisor ").append(this.supervisorId).append(" on port ").append(port).append(" with id ").append(workerId);
            LOG.info(sb.toString());
            try {
                String clusterMode = StormConfig.cluster_mode(this.conf);
                if (clusterMode.equals("distributed")) {
                    this.launchWorker(this.conf, this.sharedContext, assignment.getTopologyId(), this.supervisorId, port, workerId, assignment);
                    continue;
                }
                if (!clusterMode.equals("local")) continue;
                this.launchWorker(this.conf, this.sharedContext, assignment.getTopologyId(), this.supervisorId, port, workerId, this.workerThreadPids);
            }
            catch (Exception e) {
                this.workerReportError.report(assignment.getTopologyId(), port, assignment.getTaskIds(), JStormUtils.getErrorInfo(e), 502);
                String errorMsg = "Failed to launchWorker workerId:" + workerId + ":" + port;
                LOG.error(errorMsg, (Throwable)e);
                throw e;
            }
        }
        this.markAllNewWorkers(newWorkerIds);
    }

    boolean isWorkerDead(String workerId) {
        if (!ConfigExtension.isCheckWorkerAliveBySystemInfo(this.conf)) {
            return false;
        }
        try {
            List<String> pids = SyncProcessEvent.getPid(this.conf, workerId);
            if (pids == null || pids.size() == 0) {
                return false;
            }
            for (String pid : pids) {
                boolean isDead = JStormUtils.isProcDead(pid);
                if (isDead) {
                    LOG.info("Found " + workerId + " is dead ");
                    continue;
                }
                return false;
            }
            return true;
        }
        catch (IOException e) {
            LOG.info("Failed to check whether worker is dead through /proc/pid", (Throwable)e);
            return false;
        }
    }
}

