/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.jstorm.daemon.supervisor;

import com.alibaba.jstorm.callback.RunnableCallback;
import com.alibaba.jstorm.client.ConfigExtension;
import com.alibaba.jstorm.cluster.StormConfig;
import com.alibaba.jstorm.daemon.supervisor.CgroupManager;
import com.alibaba.jstorm.daemon.worker.ProcessSimulator;
import com.alibaba.jstorm.utils.JStormUtils;
import com.alibaba.jstorm.utils.PathUtils;
import com.alibaba.jstorm.utils.TimeUtils;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ShutdownWork
extends RunnableCallback {
    private static Logger LOG = LoggerFactory.getLogger(ShutdownWork.class);

    public void shutWorker(Map conf, String supervisorId, Map<String, String> removed, ConcurrentHashMap<String, String> workerThreadPids, CgroupManager cgroupManager, boolean block, Map<String, Integer> killingWorkers, Map<String, Integer> taskCleanupTimeoutMap) {
        List pids;
        String topologyId;
        String workerId;
        HashMap<String, List> workerId2Pids = new HashMap<String, List>();
        boolean localMode = false;
        int maxWaitTime = 0;
        if (killingWorkers == null) {
            killingWorkers = new HashMap<String, Integer>();
        }
        for (Map.Entry<String, String> entry : removed.entrySet()) {
            workerId = entry.getKey();
            topologyId = entry.getValue();
            pids = null;
            try {
                pids = ShutdownWork.getPid(conf, workerId);
            }
            catch (IOException e1) {
                LOG.error("Failed to get pid for " + workerId + " of " + topologyId);
            }
            workerId2Pids.put(workerId, pids);
            if (killingWorkers.get(workerId) != null) continue;
            killingWorkers.put(workerId, TimeUtils.current_time_secs());
            LOG.info("Begin to shut down " + topologyId + ":" + workerId);
            try {
                String threadPid = workerThreadPids.get(workerId);
                if (threadPid != null) {
                    ProcessSimulator.killProcess(threadPid);
                    localMode = true;
                    continue;
                }
                for (String pid : pids) {
                    JStormUtils.process_killed(Integer.parseInt(pid));
                }
                if (taskCleanupTimeoutMap != null && taskCleanupTimeoutMap.get(topologyId) != null) {
                    maxWaitTime = Math.max(maxWaitTime, taskCleanupTimeoutMap.get(topologyId));
                    continue;
                }
                maxWaitTime = Math.max(maxWaitTime, ConfigExtension.getTaskCleanupTimeoutSec(conf));
            }
            catch (Exception e) {
                LOG.info("Failed to shutdown ", (Throwable)e);
            }
        }
        if (block) {
            JStormUtils.sleepMs(maxWaitTime);
        }
        for (Map.Entry<String, String> entry : removed.entrySet()) {
            workerId = entry.getKey();
            topologyId = entry.getValue();
            pids = (List)workerId2Pids.get(workerId);
            int cleanupTimeout = taskCleanupTimeoutMap != null && taskCleanupTimeoutMap.get(topologyId) != null ? taskCleanupTimeoutMap.get(topologyId) : ConfigExtension.getTaskCleanupTimeoutSec(conf);
            int initCleaupTime = killingWorkers.get(workerId);
            if (TimeUtils.current_time_secs() - initCleaupTime <= cleanupTimeout) continue;
            if (!localMode) {
                for (String pid : pids) {
                    JStormUtils.ensure_process_killed(Integer.parseInt(pid));
                    if (cgroupManager == null) continue;
                    cgroupManager.shutDownWorker(workerId, true);
                }
            }
            ShutdownWork.tryCleanupWorkerDir(conf, workerId);
            LOG.info("Successfully shut down " + workerId);
            killingWorkers.remove(workerId);
        }
    }

    public static void shutWorker(Map conf, String supervisorId, Map<String, String> removed, boolean block) {
        String workerId;
        HashMap<String, List<String>> workerId2Pids = new HashMap<String, List<String>>();
        int maxWaitTime = 0;
        maxWaitTime = Math.max(maxWaitTime, ConfigExtension.getTaskCleanupTimeoutSec(conf));
        HashMap<String, Integer> killingWorkers = new HashMap<String, Integer>();
        for (Map.Entry<String, String> entry : removed.entrySet()) {
            workerId = entry.getKey();
            String topologyId = entry.getValue();
            List<String> pids = null;
            try {
                pids = ShutdownWork.getPid(conf, workerId);
            }
            catch (IOException e1) {
                LOG.error("Failed to get pid for " + workerId + " of " + topologyId);
            }
            workerId2Pids.put(workerId, pids);
            if (killingWorkers.get(workerId) != null) continue;
            killingWorkers.put(workerId, TimeUtils.current_time_secs());
            LOG.info("Begin to shut down " + topologyId + ":" + workerId);
            try {
                for (String pid : pids) {
                    JStormUtils.process_killed(Integer.parseInt(pid));
                }
            }
            catch (Exception e) {
                LOG.info("Failed to shutdown ", (Throwable)e);
            }
        }
        if (block) {
            JStormUtils.sleepMs(maxWaitTime * 1000);
        }
        for (Map.Entry<String, String> entry : removed.entrySet()) {
            workerId = entry.getKey();
            List pids = (List)workerId2Pids.get(workerId);
            int cleanupTimeout = ConfigExtension.getTaskCleanupTimeoutSec(conf);
            int initCleanUpTime = (Integer)killingWorkers.get(workerId);
            if (TimeUtils.current_time_secs() - initCleanUpTime <= cleanupTimeout) continue;
            for (String pid : pids) {
                JStormUtils.ensure_process_killed(Integer.parseInt(pid));
            }
            ShutdownWork.tryCleanupWorkerDir(conf, workerId);
            LOG.info("Successfully shut down " + workerId);
            killingWorkers.remove(workerId);
        }
    }

    public static void tryCleanupWorkerDir(Map conf, String workerId) {
        try {
            PathUtils.rmr(StormConfig.worker_heartbeats_root(conf, workerId));
            PathUtils.rmr(StormConfig.worker_pids_root(conf, workerId));
            PathUtils.rmr(StormConfig.worker_root(conf, workerId));
        }
        catch (Exception e) {
            LOG.warn(e + "Failed to cleanup worker " + workerId + ". Will retry later");
        }
    }

    public static List<String> getPid(Map conf, String workerId) throws IOException {
        String workerPidPath = StormConfig.worker_pids_root(conf, workerId);
        List<String> pids = PathUtils.read_dir_contents(workerPidPath);
        return pids;
    }
}

