package com.alibaba.jstorm.daemon.supervisor;

import backtype.storm.Config;
import backtype.storm.GenericOptionsParser;
import backtype.storm.messaging.IContext;
import backtype.storm.utils.LocalState;
import com.alibaba.jstorm.client.ConfigExtension;
import com.alibaba.jstorm.cluster.Common;
import com.alibaba.jstorm.cluster.StormConfig;
import com.alibaba.jstorm.daemon.worker.LocalAssignment;
import com.alibaba.jstorm.daemon.worker.ProcessSimulator;
import com.alibaba.jstorm.daemon.worker.State;
import com.alibaba.jstorm.daemon.worker.Worker;
import com.alibaba.jstorm.daemon.worker.WorkerHeartbeat;
import com.alibaba.jstorm.daemon.worker.WorkerReportError;
import com.alibaba.jstorm.task.error.ErrorConstants;
import com.alibaba.jstorm.utils.JStormUtils;
import com.alibaba.jstorm.utils.Pair;
import com.alibaba.jstorm.utils.PathUtils;
import com.alibaba.jstorm.utils.ProcessLauncher;
import com.alibaba.jstorm.utils.TimeFormat;
import com.alibaba.jstorm.utils.TimeUtils;
import java.io.File;
import java.io.IOException;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alibaba/jstorm/daemon/supervisor/SyncProcessEvent.class */
class SyncProcessEvent extends ShutdownWork {
    private static Logger LOG = LoggerFactory.getLogger(SyncProcessEvent.class);
    private LocalState localState;
    private Map conf;
    private ConcurrentHashMap<String, String> workerThreadPids;
    private String supervisorId;
    private IContext sharedContext;
    private CgroupManager cgroupManager;
    private SandBoxMaker sandBoxMaker;
    private Map<String, Integer> killingWorkers;
    private int lastTime;
    private WorkerReportError workerReportError;
    private Map<Integer, String> portToWorkerId = new HashMap();
    private Map<String, Pair<Integer, Integer>> workerIdToStartTimeAndPort = new HashMap();
    private AtomicReference<Set> needDownloadTopologys = new AtomicReference<>();

    public SyncProcessEvent(String str, Map map, LocalState localState, ConcurrentHashMap<String, String> concurrentHashMap, IContext iContext, WorkerReportError workerReportError) {
        this.supervisorId = str;
        this.conf = map;
        this.localState = localState;
        this.workerThreadPids = concurrentHashMap;
        this.sharedContext = iContext;
        this.sandBoxMaker = new SandBoxMaker(map);
        if (ConfigExtension.isEnableCgroup(map)) {
            this.cgroupManager = new CgroupManager(map);
        }
        this.killingWorkers = new HashMap();
        this.workerReportError = workerReportError;
    }

    @Override // com.alibaba.jstorm.callback.RunnableCallback, java.lang.Runnable
    public void run() {
    }

    public void run(Map<Integer, LocalAssignment> map, Set<String> set) {
        LOG.debug("Syncing processes, interval seconds:" + TimeUtils.time_delta(this.lastTime));
        this.lastTime = TimeUtils.current_time_secs();
        if (map == null) {
            try {
                map = new HashMap();
            } catch (Exception e) {
                LOG.error("Failed Sync Process", e);
                return;
            }
        }
        LOG.debug("Assigned tasks: " + map);
        try {
            Map<String, StateHeartbeat> localWorkerStats = getLocalWorkerStats(this.conf, this.localState, map);
            LOG.debug("Allocated: " + localWorkerStats);
            Set<Integer> set2 = null;
            try {
                Map<String, Integer> map2 = (Map) this.localState.get(Common.LS_TASK_CLEANUP_TIMEOUT);
                set2 = killUselessWorkers(localWorkerStats, map, map2);
                this.localState.put(Common.LS_TASK_CLEANUP_TIMEOUT, map2);
            } catch (IOException e2) {
                LOG.error("Failed to kill workers", e2);
            }
            checkNewWorkers(this.conf);
            checkNeedUpdateTopologys(localWorkerStats, map);
            startNewWorkers(set2, map, set);
        } catch (Exception e3) {
            LOG.error("Failed to get Local worker stats");
            throw e3;
        }
    }

    public void checkNeedUpdateTopologys(Map<String, StateHeartbeat> map, Map<Integer, LocalAssignment> map2) throws Exception {
        Set<String> hashSet = new HashSet();
        Iterator<Map.Entry<Integer, LocalAssignment>> it = map2.entrySet().iterator();
        while (it.hasNext()) {
            hashSet.add(it.next().getValue().getTopologyId());
        }
        for (StateHeartbeat stateHeartbeat : map.values()) {
            if (!stateHeartbeat.getState().equals(State.notStarted)) {
                hashSet.remove(stateHeartbeat.getHeartbeat().getTopologyId());
            }
        }
        long currentTimeMillis = System.currentTimeMillis();
        HashSet hashSet2 = new HashSet();
        for (String str : hashSet) {
            try {
                if ((currentTimeMillis - StormConfig.get_supervisor_topology_Bianrymodify_time(this.conf, str)) / 1000 < 120) {
                    LOG.debug("less 2 miniute ,so removed " + str);
                    hashSet2.add(str);
                }
            } catch (Exception e) {
                LOG.error("Failed to get the time of file last modification for topology" + str, e);
                hashSet2.add(str);
            }
        }
        hashSet.removeAll(hashSet2);
        if (hashSet.size() > 0) {
            LOG.debug("Following topologies are going to re-download the jars, " + hashSet);
        }
        this.needDownloadTopologys.set(hashSet);
    }

    public void markAllNewWorkers(Map<Integer, String> map) {
        int current_time_secs = TimeUtils.current_time_secs();
        for (Map.Entry<Integer, String> entry : map.entrySet()) {
            String str = this.portToWorkerId.get(entry.getKey());
            if (str != null) {
                this.workerIdToStartTimeAndPort.remove(str);
                LOG.info("exit port is still occupied by old workerId, so remove useless " + str + " form workerIdToStartTimeAndPort");
            }
            this.portToWorkerId.put(entry.getKey(), entry.getValue());
            this.workerIdToStartTimeAndPort.put(entry.getValue(), new Pair<>(Integer.valueOf(current_time_secs), entry.getKey()));
        }
    }

    public void checkNewWorkers(Map map) throws IOException, InterruptedException {
        HashSet<String> hashSet = new HashSet();
        for (Map.Entry<String, Pair<Integer, Integer>> entry : this.workerIdToStartTimeAndPort.entrySet()) {
            String key = entry.getKey();
            int intValue = entry.getValue().getFirst().intValue();
            if (((WorkerHeartbeat) StormConfig.worker_state(map, key).get(Common.LS_WORKER_HEARTBEAT)) != null) {
                LOG.info("Successfully start worker " + key);
                hashSet.add(key);
            } else if (TimeUtils.current_time_secs() - intValue < JStormUtils.parseInt(map.get(Config.SUPERVISOR_WORKER_START_TIMEOUT_SECS)).intValue()) {
                LOG.info(key + " still hasn't started");
            } else {
                LOG.error("Failed to start Worker " + key);
                hashSet.add(key);
            }
        }
        for (String str : hashSet) {
            Integer second = this.workerIdToStartTimeAndPort.get(str).getSecond();
            this.workerIdToStartTimeAndPort.remove(str);
            this.portToWorkerId.remove(second);
        }
    }

    public Map<Integer, String> getPortToWorkerId() {
        return this.portToWorkerId;
    }

    public Map<String, StateHeartbeat> getLocalWorkerStats(Map map, LocalState localState, Map<Integer, LocalAssignment> map2) throws Exception {
        State state;
        HashMap hashMap = new HashMap();
        int current_time_secs = TimeUtils.current_time_secs();
        for (Map.Entry<String, WorkerHeartbeat> entry : readWorkerHeartbeats(map).entrySet()) {
            String key = entry.getKey();
            WorkerHeartbeat value = entry.getValue();
            if (value == null) {
                state = State.notStarted;
                Pair<Integer, Integer> pair = this.workerIdToStartTimeAndPort.get(key);
                if (pair != null && map2.get(pair.getSecond()) == null) {
                    LOG.info("Following worker don't exit assignment, so remove this port=" + pair.getSecond());
                    state = State.disallowed;
                    Integer second = this.workerIdToStartTimeAndPort.get(key).getSecond();
                    this.workerIdToStartTimeAndPort.remove(key);
                    this.portToWorkerId.remove(second);
                }
            } else if (!matchesAssignment(value, map2)) {
                state = State.disallowed;
            } else if (current_time_secs - value.getTimeSecs() > JStormUtils.parseInt(map.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS)).intValue()) {
                if (!this.killingWorkers.containsKey(key)) {
                    this.workerReportError.report(value.getTopologyId(), value.getPort(), value.getTaskIds(), " it is likely to be out of memory, the worker is time out ", ErrorConstants.CODE_WORKER_TIMEOUT);
                }
                state = State.timedOut;
            } else if (isWorkerDead(key)) {
                if (!this.killingWorkers.containsKey(key)) {
                    this.workerReportError.report(value.getTopologyId(), value.getPort(), value.getTaskIds(), "Worker is dead ", ErrorConstants.CODE_WORKER_DEAD);
                }
                state = State.timedOut;
            } else {
                state = State.valid;
            }
            if (state == State.valid) {
                LOG.debug("Worker:" + key + " state:" + state + " WorkerHeartbeat: " + value + " at supervisor time-secs " + current_time_secs);
            } else if (!this.killingWorkers.containsKey(key)) {
                LOG.info("Worker:" + key + " state:" + state + " WorkerHeartbeat:" + value + " assignedTasks:" + map2 + " at supervisor time-secs " + current_time_secs);
            }
            hashMap.put(key, new StateHeartbeat(state, value));
        }
        return hashMap;
    }

    public boolean matchesAssignment(WorkerHeartbeat workerHeartbeat, Map<Integer, LocalAssignment> map) {
        boolean z = true;
        LocalAssignment localAssignment = map.get(workerHeartbeat.getPort());
        if (localAssignment == null) {
            LOG.debug("Following worker has been removed, port=" + workerHeartbeat.getPort() + ", assignedTasks=" + map);
            z = false;
        } else if (!workerHeartbeat.getTopologyId().equals(localAssignment.getTopologyId())) {
            LOG.info("topology id not equal whb=" + workerHeartbeat.getTopologyId() + ",localAssignment=" + localAssignment.getTopologyId());
            z = false;
        }
        return z;
    }

    public Map<String, WorkerHeartbeat> readWorkerHeartbeats(Map map) throws Exception {
        HashMap hashMap = new HashMap();
        String worker_root = StormConfig.worker_root(map);
        List<String> read_dir_contents = PathUtils.read_dir_contents(worker_root);
        if (read_dir_contents == null) {
            LOG.info("No worker dir under " + worker_root);
            return hashMap;
        }
        for (String str : read_dir_contents) {
            hashMap.put(str, readWorkerHeartbeat(map, str));
        }
        return hashMap;
    }

    public WorkerHeartbeat readWorkerHeartbeat(Map map, String str) throws Exception {
        try {
            return (WorkerHeartbeat) StormConfig.worker_state(map, str).get(Common.LS_WORKER_HEARTBEAT);
        } catch (Exception e) {
            LOG.error("Failed to get worker Heartbeat", e);
            return null;
        }
    }

    public void launchWorker(Map map, IContext iContext, String str, String str2, Integer num, String str3, ConcurrentHashMap<String, String> concurrentHashMap) throws Exception {
        String uuid = UUID.randomUUID().toString();
        ProcessSimulator.registerProcess(uuid, Worker.mk_worker(map, iContext, str, str2, num.intValue(), str3, null));
        concurrentHashMap.put(str3, uuid);
    }

    private Set<String> setFilterJars(Map map) {
        HashSet hashSet = new HashSet();
        if (!ConfigExtension.isEnableTopologyClassLoader(map)) {
            if (StringUtils.isBlank(ConfigExtension.getUserDefinedLog4jConf(map)) ? false : true) {
                hashSet.add("log4j-over-slf4j");
                hashSet.add("logback-core");
                hashSet.add("logback-classic");
            } else {
                hashSet.add("slf4j-log4j");
                hashSet.add("log4j");
            }
        }
        String str = (String) map.get("exclude.jars");
        if (!StringUtils.isBlank(str)) {
            for (String str2 : str.split(",")) {
                hashSet.add(str2);
            }
        }
        LOG.info("Remove jars " + hashSet);
        return hashSet;
    }

    public static boolean isKeyContain(Collection<String> collection, String str) {
        if (collection == null) {
            return false;
        }
        String name = new File(str).getName();
        Iterator<String> it = collection.iterator();
        while (it.hasNext()) {
            if (Pattern.compile(it.next() + "[-._0-9]*.jar").matcher(name).matches()) {
                return true;
            }
        }
        return false;
    }

    public AtomicReference<Set> getTopologyIdNeedDownload() {
        return this.needDownloadTopologys;
    }

    private String getClassPath(String str, Map map) {
        String[] split = JStormUtils.current_classpath().split(":");
        HashSet<String> hashSet = new HashSet();
        for (String str2 : split) {
            if (!StringUtils.isBlank(str2)) {
                hashSet.add(str2);
            }
        }
        if (str != null) {
            for (String str3 : PathUtils.read_dir_contents(str)) {
                if (str3.endsWith(".jar")) {
                    hashSet.add(str + File.separator + str3);
                }
            }
            for (String str4 : PathUtils.read_dir_contents(str + File.separator + "lib")) {
                if (str4.endsWith(".jar")) {
                    hashSet.add(str + File.separator + "lib" + File.separator + str4);
                }
            }
        }
        Set<String> filterJars = setFilterJars(map);
        StringBuilder sb = new StringBuilder();
        for (String str5 : hashSet) {
            if (isKeyContain(filterJars, str5)) {
                LOG.info("Remove " + str5);
            } else {
                sb.append(str5).append(":");
            }
        }
        return sb.toString();
    }

    public String getWorkerClassPath(String str, Map map, String str2) {
        StringBuilder sb = new StringBuilder();
        if (!StringUtils.isBlank((String) map.get(Config.TOPOLOGY_CLASSPATH))) {
            sb.append(map.get(Config.TOPOLOGY_CLASSPATH)).append(":");
        }
        List list = (List) map.get(GenericOptionsParser.TOPOLOGY_LIB_NAME);
        if (list != null) {
            Iterator it = list.iterator();
            while (it.hasNext()) {
                sb.append(StormConfig.stormlib_path(str2, (String) it.next())).append(":");
            }
        }
        sb.append(str);
        return sb.toString();
    }

    public String getChildOpts(Map map) {
        String str = " ";
        if (map.get(Config.TOPOLOGY_WORKER_CHILDOPTS) != null) {
            str = str + ((String) map.get(Config.TOPOLOGY_WORKER_CHILDOPTS));
        } else if (ConfigExtension.getWorkerGc(map) != null) {
            str = str + ConfigExtension.getWorkerGc(map);
        }
        return str;
    }

    public String getLogParameter(Map map, String str, String str2, int i) {
        String genLogName = JStormUtils.genLogName(str2, Integer.valueOf(i));
        StringBuilder sb = new StringBuilder();
        String property = System.getProperty("jstorm.log.dir");
        if (!StringUtils.isBlank(property)) {
            sb.append(" -Djstorm.log.dir=").append(property);
        }
        sb.append(" -Dlogfile.name=").append(genLogName);
        sb.append(" -Dtopology.name=").append(str2);
        String userDefinedLogbackConf = ConfigExtension.getUserDefinedLogbackConf(map);
        String property2 = System.getProperty("logback.configurationFile");
        if (!StringUtils.isBlank(userDefinedLogbackConf)) {
            LOG.info("Use user defined logback conf " + userDefinedLogbackConf);
            sb.append(" -Dlogback.configurationFile=").append(userDefinedLogbackConf);
        } else if (!StringUtils.isBlank(property2)) {
            sb.append(" -Dlogback.configurationFile=").append(property2);
        } else if (StringUtils.isBlank(str)) {
            sb.append(" -Dlogback.configurationFile=jstorm.logback.xml");
        } else {
            sb.append(" -Dlogback.configurationFile=").append(str).append(File.separator).append("conf").append(File.separator).append("jstorm.logback.xml");
        }
        String userDefinedLog4jConf = ConfigExtension.getUserDefinedLog4jConf(map);
        if (!StringUtils.isBlank(userDefinedLog4jConf)) {
            LOG.info("Use user defined log4j conf " + userDefinedLog4jConf);
            sb.append(" -Dlog4j.configuration=").append(userDefinedLog4jConf);
        }
        return sb.toString();
    }

    private String getGcDumpParam(String str, int i, Map map) {
        String second = TimeFormat.getSecond(new Date());
        String topologyNameById = Common.getTopologyNameById(str);
        String logDir = JStormUtils.getLogDir();
        try {
            FileUtils.forceMkdir(new File(PathUtils.join(logDir, topologyNameById)));
        } catch (Exception e) {
        }
        String join = PathUtils.join(logDir, topologyNameById, topologyNameById + "-worker-" + i + "-gc.log");
        String join2 = PathUtils.join(logDir, topologyNameById, "java-" + str + "-" + second + ".hprof");
        PathUtils.mv(join, join + ".old");
        StringBuilder sb = new StringBuilder(256);
        sb.append(" -Xloggc:").append(join).append(" -verbose:gc -XX:+PrintGCDateStamps -XX:+PrintGCDetails").append(" -XX:HeapDumpPath=").append(join2).append(" ");
        return sb.toString().replace("/./", "/");
    }

    public String getWorkerMemParameter(LocalAssignment localAssignment, Map map, String str, Integer num) {
        long mem = localAssignment.getMem();
        long memMinSizePerWorker = ConfigExtension.getMemMinSizePerWorker(map);
        long j = mem / JStormUtils.SIZE_1_G;
        int i = j > 4 ? (int) (j * 1.5d) : 4;
        String str2 = getChildOpts(map) + getGcDumpParam(str, num.intValue(), map);
        StringBuilder sb = new StringBuilder();
        sb.append(" -Xms").append(memMinSizePerWorker).append(" -Xmx").append(mem).append(" ");
        if (memMinSizePerWorker < mem / 2) {
            sb.append(" -Xmn").append(memMinSizePerWorker / 2).append(" ");
        } else {
            sb.append(" -Xmn").append(mem / 2).append(" ");
        }
        if (j >= 2) {
            sb.append(" -XX:PermSize=").append(mem / 32);
        } else {
            sb.append(" -XX:PermSize=").append(mem / 16);
        }
        sb.append(" -XX:MaxPermSize=").append(mem / 16);
        sb.append(" -XX:ParallelGCThreads=").append(i);
        sb.append(" ").append(str2);
        if (!StringUtils.isBlank(localAssignment.getJvm())) {
            sb.append(" ").append(localAssignment.getJvm());
        }
        return sb.toString();
    }

    public String getSandBoxParameter(String str, String str2, String str3) throws IOException {
        HashMap hashMap = new HashMap();
        hashMap.put(SandBoxMaker.CLASS_PATH_KEY, str + ":" + str2);
        return this.sandBoxMaker.sandboxPolicy(str3, hashMap);
    }

    public String getWorkerParameter(LocalAssignment localAssignment, Map map, String str, String str2, String str3, String str4, Integer num) throws IOException {
        String supervisor_stormdist_root = StormConfig.supervisor_stormdist_root(this.conf, str2);
        String stormjar_path = StormConfig.stormjar_path(supervisor_stormdist_root);
        StringBuilder sb = new StringBuilder();
        try {
            if (this.cgroupManager != null) {
                sb.append(this.cgroupManager.startNewWorker(map, localAssignment.getCpu(), str4));
            }
        } catch (Exception e) {
            LOG.error("Failed to prepare cgroup to workerId: " + str4, e);
            sb = new StringBuilder();
        }
        sb.append("java -server ");
        sb.append(getWorkerMemParameter(localAssignment, map, str2, num));
        sb.append(" -Djava.library.path=").append((String) map.get(Config.JAVA_LIBRARY_PATH));
        sb.append(" -Djstorm.home=").append(str);
        sb.append(getLogParameter(map, str, localAssignment.getTopologyName(), num.intValue()));
        String classPath = getClassPath(str, map);
        String workerClassPath = getWorkerClassPath(stormjar_path, map, supervisor_stormdist_root);
        sb.append(getSandBoxParameter(classPath, workerClassPath, str4));
        sb.append(" -cp ");
        sb.append(classPath);
        if (!ConfigExtension.isEnableTopologyClassLoader(map)) {
            sb.append(":").append(workerClassPath);
        }
        sb.append(" com.alibaba.jstorm.daemon.worker.Worker ");
        sb.append(str2);
        sb.append(" ").append(str3);
        sb.append(" ").append(num);
        sb.append(" ").append(str4);
        sb.append(" ").append(workerClassPath);
        return sb.toString();
    }

    public String getLauncherParameter(LocalAssignment localAssignment, Map map, String str, String str2, int i) throws IOException {
        if (!ConfigExtension.isProcessLauncherEnable(map)) {
            return "";
        }
        String stormjar_path = StormConfig.stormjar_path(StormConfig.supervisor_stormdist_root(this.conf, str2));
        StringBuilder sb = new StringBuilder();
        sb.append(" java ");
        sb.append(ConfigExtension.getProcessLauncherChildOpts(map));
        sb.append(getLogParameter(map, str, localAssignment.getTopologyName(), i));
        sb.append(" -cp ");
        sb.append(getClassPath(str, map));
        if (!ConfigExtension.isEnableTopologyClassLoader(map)) {
            sb.append(":").append(stormjar_path);
        }
        sb.append(" ").append(ProcessLauncher.class.getName()).append(" ");
        String sb2 = sb.toString();
        if (ConfigExtension.getWorkerRedirectOutput(map)) {
            sb2 = sb2.replaceAll("-Dlogfile\\.name=.*?\\s", "-Dlogfile.name=" + getWorkerRedirectOutput(map, localAssignment, i) + " ");
        }
        return sb2;
    }

    public void launchWorker(Map map, IContext iContext, String str, String str2, Integer num, String str3, LocalAssignment localAssignment) throws IOException {
        Map read_supervisor_topology_conf = StormConfig.read_supervisor_topology_conf(map, str);
        String property = System.getProperty("jstorm.home");
        if (StringUtils.isBlank(property)) {
            property = "./";
        }
        HashMap hashMap = new HashMap();
        hashMap.putAll(map);
        hashMap.putAll(read_supervisor_topology_conf);
        HashMap hashMap2 = new HashMap();
        if (ConfigExtension.getWorkerRedirectOutput(hashMap)) {
            hashMap2.put("REDIRECT", "true");
        } else {
            hashMap2.put("REDIRECT", "false");
        }
        hashMap2.put("LD_LIBRARY_PATH", (String) hashMap.get(Config.JAVA_LIBRARY_PATH));
        hashMap2.put("jstorm.home", property);
        hashMap2.put("jstorm.workerId", str3);
        String replace = (getLauncherParameter(localAssignment, hashMap, property, str, num.intValue()) + " " + getWorkerParameter(localAssignment, hashMap, property, str, str2, str3, num)).replace(SandBoxMaker.JSTORM_HOME_KEY, property);
        LOG.info("Launching worker with command: " + replace);
        LOG.info("Environment:" + hashMap2.toString());
        JStormUtils.launchProcess(replace, hashMap2, true);
    }

    private Set<Integer> killUselessWorkers(Map<String, StateHeartbeat> map, Map<Integer, LocalAssignment> map2, Map<String, Integer> map3) {
        HashMap hashMap = new HashMap();
        HashSet hashSet = new HashSet();
        for (Map.Entry<String, StateHeartbeat> entry : map.entrySet()) {
            String key = entry.getKey();
            StateHeartbeat value = entry.getValue();
            if (!this.workerIdToStartTimeAndPort.containsKey(key) || !value.getState().equals(State.notStarted)) {
                if (value.getState().equals(State.valid)) {
                    hashSet.add(value.getHeartbeat().getPort());
                } else {
                    if (value.getHeartbeat() != null) {
                        hashMap.put(key, value.getHeartbeat().getTopologyId());
                    } else {
                        hashMap.put(key, null);
                    }
                    if (!this.killingWorkers.containsKey(key)) {
                        StringBuilder sb = new StringBuilder();
                        sb.append("Shutting down and clearing state for id ").append(key);
                        sb.append(";State:").append(value);
                        LOG.info(sb.toString());
                    }
                }
            }
        }
        shutWorker(this.conf, this.supervisorId, hashMap, this.workerThreadPids, this.cgroupManager, false, this.killingWorkers, map3);
        HashSet hashSet2 = new HashSet();
        if (this.killingWorkers.size() == 0) {
            Iterator<Map.Entry<Integer, LocalAssignment>> it = map2.entrySet().iterator();
            while (it.hasNext()) {
                hashSet2.add(it.next().getValue().getTopologyId());
            }
            HashSet hashSet3 = new HashSet();
            for (String str : map3.keySet()) {
                if (!hashSet2.contains(str)) {
                    hashSet3.add(str);
                }
            }
            Iterator it2 = hashSet3.iterator();
            while (it2.hasNext()) {
                map3.remove((String) it2.next());
            }
        }
        Iterator<String> it3 = hashMap.keySet().iterator();
        while (it3.hasNext()) {
            map.remove(it3.next());
        }
        for (Map.Entry<String, Pair<Integer, Integer>> entry2 : this.workerIdToStartTimeAndPort.entrySet()) {
            StateHeartbeat stateHeartbeat = map.get(entry2.getKey());
            if (stateHeartbeat != null && stateHeartbeat.getState().equals(State.notStarted)) {
                hashSet.add(entry2.getValue().getSecond());
            }
        }
        return hashSet;
    }

    private String getWorkerRedirectOutput(Map map, LocalAssignment localAssignment, int i) {
        String genLogName = JStormUtils.genLogName(localAssignment.getTopologyName(), Integer.valueOf(i));
        String str = genLogName == null ? "/dev/null" : genLogName + ".out";
        String workerRedirectOutputFile = ConfigExtension.getWorkerRedirectOutputFile(map);
        return workerRedirectOutputFile == null ? str : workerRedirectOutputFile + "-" + localAssignment.getTopologyName() + "-" + i;
    }

    private void startNewWorkers(Set<Integer> set, Map<Integer, LocalAssignment> map, Set<String> set2) throws Exception {
        Map select_keys_pred = JStormUtils.select_keys_pred(set, map);
        HashMap hashMap = new HashMap();
        for (Map.Entry entry : select_keys_pred.entrySet()) {
            Integer num = (Integer) entry.getKey();
            LocalAssignment localAssignment = (LocalAssignment) entry.getValue();
            if (localAssignment == null || localAssignment.getTopologyId() == null || !set2.contains(localAssignment.getTopologyId())) {
                String uuid = UUID.randomUUID().toString();
                hashMap.put(num, uuid);
                try {
                    StormConfig.worker_pids_root(this.conf, uuid);
                    StringBuilder sb = new StringBuilder();
                    sb.append("Launching worker with assiangment ");
                    sb.append(localAssignment).append(" for the supervisor ").append(this.supervisorId).append(" on port ").append(num).append(" with id ").append(uuid);
                    LOG.info(sb.toString());
                    try {
                        String cluster_mode = StormConfig.cluster_mode(this.conf);
                        if (cluster_mode.equals("distributed")) {
                            launchWorker(this.conf, this.sharedContext, localAssignment.getTopologyId(), this.supervisorId, num, uuid, localAssignment);
                        } else if (cluster_mode.equals("local")) {
                            launchWorker(this.conf, this.sharedContext, localAssignment.getTopologyId(), this.supervisorId, num, uuid, this.workerThreadPids);
                        }
                    } catch (Exception e) {
                        this.workerReportError.report(localAssignment.getTopologyId(), num, localAssignment.getTaskIds(), JStormUtils.getErrorInfo(e), ErrorConstants.CODE_WORKER_EX);
                        LOG.error("Failed to launchWorker workerId:" + uuid + ":" + num, e);
                        throw e;
                    }
                } catch (IOException e2) {
                    LOG.error("Failed to create " + uuid + " localdir", e2);
                    throw e2;
                }
            } else {
                LOG.info("Can't start this worker: " + num + " about the topology: " + localAssignment.getTopologyId() + ", due to the damaged binary !!");
            }
        }
        markAllNewWorkers(hashMap);
    }

    boolean isWorkerDead(String str) {
        if (!ConfigExtension.isCheckWorkerAliveBySystemInfo(this.conf)) {
            return false;
        }
        try {
            List<String> pid = getPid(this.conf, str);
            if (pid == null || pid.size() == 0) {
                return false;
            }
            Iterator<String> it = pid.iterator();
            while (it.hasNext()) {
                if (!JStormUtils.isProcDead(it.next())) {
                    return false;
                }
                LOG.info("Found " + str + " is dead ");
            }
            return true;
        } catch (IOException e) {
            LOG.info("Failed to check whether worker is dead through /proc/pid", e);
            return false;
        }
    }
}
